summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 10:05:47 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 10:05:47 +0200
commit6c3bc11f736040a09fd839832a6be01e434e8aab (patch)
tree6b856c2f79d2f75ccd8ba89c638ee18839b4d061
parenta5a405d79fe3d9e0c6ea081b425d36bd67d8671d (diff)
Stop stale query work promptly on generation cancel
-rw-r--r--internal/ctxutil/sleep.go29
-rw-r--r--internal/ctxutil/sleep_test.go32
-rw-r--r--internal/io/fs/readfile.go5
-rw-r--r--internal/io/fs/readfile_processor.go5
-rw-r--r--internal/io/fs/readfile_processor_optimized.go11
-rw-r--r--internal/mapr/server/turbo_aggregate.go45
-rw-r--r--internal/mapr/server/turbo_aggregate_test.go19
-rw-r--r--internal/server/handlers/readcommand.go27
-rw-r--r--internal/server/handlers/readcommand_cancellation_test.go69
-rw-r--r--internal/server/handlers/sessioncommand.go2
-rw-r--r--internal/server/handlers/turbo_writer.go69
-rw-r--r--internal/server/handlers/turbo_writer_test.go44
12 files changed, 322 insertions, 35 deletions
diff --git a/internal/ctxutil/sleep.go b/internal/ctxutil/sleep.go
new file mode 100644
index 0000000..6965e8d
--- /dev/null
+++ b/internal/ctxutil/sleep.go
@@ -0,0 +1,29 @@
+package ctxutil
+
+import (
+ "context"
+ "time"
+)
+
+// Sleep waits for the delay or exits early when the context is canceled.
+// It returns true when the full delay elapsed.
+func Sleep(ctx context.Context, delay time.Duration) bool {
+ if delay <= 0 {
+ select {
+ case <-ctx.Done():
+ return false
+ default:
+ return true
+ }
+ }
+
+ timer := time.NewTimer(delay)
+ defer timer.Stop()
+
+ select {
+ case <-ctx.Done():
+ return false
+ case <-timer.C:
+ return true
+ }
+}
diff --git a/internal/ctxutil/sleep_test.go b/internal/ctxutil/sleep_test.go
new file mode 100644
index 0000000..f32b22c
--- /dev/null
+++ b/internal/ctxutil/sleep_test.go
@@ -0,0 +1,32 @@
+package ctxutil
+
+import (
+ "context"
+ "testing"
+ "time"
+)
+
+func TestSleepReturnsEarlyOnCancel(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ start := time.Now()
+ if Sleep(ctx, time.Second) {
+ t.Fatal("Sleep should stop when the context is canceled")
+ }
+ if elapsed := time.Since(start); elapsed > 50*time.Millisecond {
+ t.Fatalf("Sleep took too long to return after cancellation: %v", elapsed)
+ }
+}
+
+func TestSleepWaitsForDelay(t *testing.T) {
+ ctx := context.Background()
+
+ start := time.Now()
+ if !Sleep(ctx, 20*time.Millisecond) {
+ t.Fatal("Sleep should report success when the delay elapses")
+ }
+ if elapsed := time.Since(start); elapsed < 15*time.Millisecond {
+ t.Fatalf("Sleep returned too early: %v", elapsed)
+ }
+}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 47a999d..0ec2eca 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -13,6 +13,7 @@ import (
"sync"
"time"
+ "github.com/mimecast/dtail/internal/ctxutil"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
@@ -206,7 +207,9 @@ func (f *readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader,
if abortReading == status {
return err
}
- time.Sleep(time.Millisecond * 100)
+ if !ctxutil.Sleep(ctx, 100*time.Millisecond) {
+ return nil
+ }
continue
}
diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go
index 8f56bdd..672d6d8 100644
--- a/internal/io/fs/readfile_processor.go
+++ b/internal/io/fs/readfile_processor.go
@@ -8,6 +8,7 @@ import (
"os"
"time"
+ "github.com/mimecast/dtail/internal/ctxutil"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
@@ -77,7 +78,9 @@ func (f *readFile) readWithProcessor(ctx context.Context, fd *os.File, reader *b
if abortReading == status {
return err
}
- time.Sleep(time.Millisecond * 100)
+ if !ctxutil.Sleep(ctx, 100*time.Millisecond) {
+ return nil
+ }
continue
}
diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go
index 2e880e7..1e553ee 100644
--- a/internal/io/fs/readfile_processor_optimized.go
+++ b/internal/io/fs/readfile_processor_optimized.go
@@ -8,6 +8,7 @@ import (
"os"
"time"
+ "github.com/mimecast/dtail/internal/ctxutil"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
@@ -346,6 +347,8 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File,
return err
}
+ waitForMoreData := true
+
// EOF handling
select {
case <-ctx.Done():
@@ -354,8 +357,12 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File,
if isTruncated, err := f.truncated(fd); isTruncated {
return err
}
- case <-time.After(100 * time.Millisecond):
- // Continue reading after a short delay
+ waitForMoreData = false
+ default:
+ }
+
+ if waitForMoreData && !ctxutil.Sleep(ctx, 100*time.Millisecond) {
+ return nil
}
}
diff --git a/internal/mapr/server/turbo_aggregate.go b/internal/mapr/server/turbo_aggregate.go
index 9b5afe7..188be1c 100644
--- a/internal/mapr/server/turbo_aggregate.go
+++ b/internal/mapr/server/turbo_aggregate.go
@@ -56,6 +56,21 @@ type rawLine struct {
sourceID string
}
+func (a *TurboAggregate) stopping() bool {
+ select {
+ case <-a.done.Done():
+ return true
+ default:
+ return false
+ }
+}
+
+func (a *TurboAggregate) stopSerializeTicker() {
+ if a.serializeTicker != nil {
+ a.serializeTicker.Stop()
+ }
+}
+
// NewTurboAggregate returns a new turbo mode aggregator.
func NewTurboAggregate(queryStr string, defaultLogFormat string) (*TurboAggregate, error) {
query, err := mapr.NewQuery(queryStr)
@@ -130,9 +145,7 @@ func (a *TurboAggregate) Shutdown() {
a.done.Shutdown()
// Stop the ticker
- if a.serializeTicker != nil {
- a.serializeTicker.Stop()
- }
+ a.stopSerializeTicker()
// Wait for active processors to finish
for a.activeProcessors.Load() > 0 {
@@ -185,6 +198,19 @@ func (a *TurboAggregate) Shutdown() {
}
}
+// Abort stops background processing without waiting for final serialization.
+// Session generation replacement uses this to preempt old query work immediately.
+func (a *TurboAggregate) Abort() {
+ dlog.Server.Info("TurboAggregate: Abort called",
+ "linesProcessed", a.linesProcessed.Load(),
+ "filesProcessed", a.filesProcessed.Load(),
+ "activeProcessors", a.activeProcessors.Load(),
+ "currentGroups", a.countGroups())
+
+ a.done.Shutdown()
+ a.stopSerializeTicker()
+}
+
// Start the turbo aggregation.
func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) {
a.maprMessages = maprMessages
@@ -206,6 +232,10 @@ func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string)
// ProcessLineDirect processes a line directly without channels.
// This is called from the TurboAggregateProcessor.
func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string) error {
+ if a.stopping() {
+ return nil
+ }
+
// Increment counter first
a.linesProcessed.Add(1)
@@ -642,6 +672,11 @@ func NewTurboAggregateProcessor(aggregate *TurboAggregate, globID string) *Turbo
// ProcessLine processes a line directly to the turbo aggregate.
func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error {
+ if p.aggregate.stopping() {
+ pool.RecycleBytesBuffer(lineContent)
+ return nil
+ }
+
// Debug: Log when ProcessLine is called
if lineNum == 1 || lineNum%1000 == 0 {
dlog.Server.Info("TurboAggregateProcessor: ProcessLine called",
@@ -661,6 +696,10 @@ func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum
// Flush ensures all buffered data is processed.
func (p *TurboAggregateProcessor) Flush() error {
+ if p.aggregate.stopping() {
+ return nil
+ }
+
// Log flush call for debugging
dlog.Server.Info("TurboAggregateProcessor: Flush called",
"globID", p.globID,
diff --git a/internal/mapr/server/turbo_aggregate_test.go b/internal/mapr/server/turbo_aggregate_test.go
index f556f50..7ae4b5a 100644
--- a/internal/mapr/server/turbo_aggregate_test.go
+++ b/internal/mapr/server/turbo_aggregate_test.go
@@ -8,6 +8,7 @@ import (
"testing"
"time"
+ "github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
@@ -311,3 +312,21 @@ func TestTurboAggregateConcurrency(t *testing.T) {
t.Error("Did not find expected count of 1000 in results")
}
}
+
+func TestTurboAggregateAbortReturnsPromptlyWithActiveProcessors(t *testing.T) {
+ aggregate := &TurboAggregate{}
+ aggregate.done = internal.NewDone()
+ aggregate.activeProcessors.Store(1)
+
+ done := make(chan struct{})
+ go func() {
+ aggregate.Abort()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(100 * time.Millisecond):
+ t.Fatal("Abort did not return promptly while processors were still active")
+ }
+}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 9c85889..9677718 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -9,6 +9,7 @@ import (
"sync"
"time"
+ "github.com/mimecast/dtail/internal/ctxutil"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
@@ -88,7 +89,9 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
paths, err := filepath.Glob(glob)
if err != nil {
dlog.Server.Warn(r.server.LogContext(), glob, err)
- time.Sleep(retryInterval)
+ if !ctxutil.Sleep(ctx, retryInterval) {
+ return
+ }
continue
}
@@ -101,7 +104,9 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
return
default:
}
- time.Sleep(retryInterval)
+ if !ctxutil.Sleep(ctx, retryInterval) {
+ return
+ }
continue
}
@@ -132,6 +137,12 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
dlog.Server.Info(r.server.LogContext(), "All files processed", "count", len(paths))
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
// In turbo mode, signal EOF once all pending file work is drained.
// Active command count may still include side-effect commands (for example AUTHKEY),
// so relying on "active == 1" can skip EOF signaling and lead to dropped output.
@@ -160,7 +171,9 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
if r.server.WaitForTurboEOFAck(timeout) {
dlog.Server.Debug(r.server.LogContext(), "Turbo EOF acknowledged")
// Allow transport buffers to flush after acknowledgement.
- time.Sleep(r.server.ShutdownTurboSerializeWait())
+ if !ctxutil.Sleep(ctx, r.server.ShutdownTurboSerializeWait()) {
+ return
+ }
} else {
dlog.Server.Warn(
r.server.LogContext(),
@@ -305,7 +318,9 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext
}
}
- time.Sleep(r.server.ReadRetryInterval())
+ if !ctxutil.Sleep(ctx, r.server.ReadRetryInterval()) {
+ return
+ }
dlog.Server.Info(path, globID, "Reading file again")
}
}
@@ -362,7 +377,9 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri
// Skip this delay in serverless mode since data is written directly to stdout
if !r.server.Serverless() {
dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> waiting for data transmission")
- time.Sleep(r.server.TurboDataTransmissionDelay())
+ if !ctxutil.Sleep(ctx, r.server.TurboDataTransmissionDelay()) {
+ return startErr
+ }
}
return startErr
diff --git a/internal/server/handlers/readcommand_cancellation_test.go b/internal/server/handlers/readcommand_cancellation_test.go
new file mode 100644
index 0000000..4b77224
--- /dev/null
+++ b/internal/server/handlers/readcommand_cancellation_test.go
@@ -0,0 +1,69 @@
+package handlers
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/mimecast/dtail/internal/io/fs"
+ "github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
+ "github.com/mimecast/dtail/internal/omode"
+ "github.com/mimecast/dtail/internal/regex"
+)
+
+type retryOnlyFileReader struct{}
+
+func (retryOnlyFileReader) Start(context.Context, lcontext.LContext, chan<- *line.Line, regex.Regex) error {
+ return nil
+}
+
+func (retryOnlyFileReader) StartWithProcessor(context.Context, lcontext.LContext, line.Processor, regex.Regex) error {
+ return nil
+}
+
+func (retryOnlyFileReader) StartWithProcessorOptimized(context.Context, lcontext.LContext, line.Processor, regex.Regex) error {
+ return nil
+}
+
+func (retryOnlyFileReader) FilePath() string {
+ return ""
+}
+
+func (retryOnlyFileReader) Retry() bool {
+ return true
+}
+
+var _ fs.FileReader = retryOnlyFileReader{}
+
+func TestExecuteReadLoopStopsPromptlyWhenContextCanceledDuringRetrySleep(t *testing.T) {
+ handler := newSessionTestHandler("readcommand-cancel-user")
+ handler.serverCfg.ReadRetryIntervalMs = 1000
+
+ command := newReadCommand(handler, omode.TailClient)
+ reader := retryOnlyFileReader{}
+
+ ctx, cancel := context.WithCancel(context.Background())
+ done := make(chan struct{})
+ strategyCalls := 0
+
+ go func() {
+ command.executeReadLoop(ctx, lcontext.LContext{}, "/var/log/app.log", "app.log", regex.NewNoop(), reader,
+ func(context.Context, lcontext.LContext, fs.FileReader, regex.Regex) error {
+ strategyCalls++
+ cancel()
+ return nil
+ })
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(150 * time.Millisecond):
+ t.Fatal("executeReadLoop did not stop promptly after cancellation")
+ }
+
+ if strategyCalls != 1 {
+ t.Fatalf("expected one read attempt before cancellation, got %d", strategyCalls)
+ }
+}
diff --git a/internal/server/handlers/sessioncommand.go b/internal/server/handlers/sessioncommand.go
index 25b8d15..76c4109 100644
--- a/internal/server/handlers/sessioncommand.go
+++ b/internal/server/handlers/sessioncommand.go
@@ -253,7 +253,7 @@ func (h *ServerHandler) resetSessionAggregates() {
h.aggregate = nil
}
if h.turboAggregate != nil {
- h.turboAggregate.Shutdown()
+ h.turboAggregate.Abort()
h.turboAggregate = nil
}
}
diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go
index fa12f72..f09a2af 100644
--- a/internal/server/handlers/turbo_writer.go
+++ b/internal/server/handlers/turbo_writer.go
@@ -399,23 +399,29 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error {
data := make([]byte, w.writeBuf.Len())
copy(data, w.writeBuf.Bytes())
+ encoded := encodeGeneratedBytes(w.generation, data)
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sending to turboLines channel", "dataLen", len(data))
- // Send data to turbo channel, retry once if full
- select {
- case w.turboLines <- encodeGeneratedBytes(w.generation, data):
- dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully")
- w.writeBuf.Reset()
- return nil
- default:
- // Channel full, wait a bit and retry
- dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry")
- time.Sleep(time.Millisecond)
- w.turboLines <- encodeGeneratedBytes(w.generation, data)
- dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel after retry")
- w.writeBuf.Reset()
- return nil
+ for {
+ if !shouldWriteGeneration(w.generation, w.activeGeneration) {
+ dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "generation became stale while waiting to send")
+ w.writeBuf.Reset()
+ return nil
+ }
+
+ select {
+ case w.turboLines <- encoded:
+ dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully")
+ w.writeBuf.Reset()
+ return nil
+ default:
+ dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry")
+ if !waitForGenerationRetry(w.generation, w.activeGeneration, time.Millisecond) {
+ w.writeBuf.Reset()
+ return nil
+ }
+ }
}
}
@@ -449,12 +455,9 @@ func (w *TurboNetworkWriter) Flush() error {
dlog.Server.Trace("TurboNetworkWriter.Flush", "flushing buffered data", "bufSize", w.writeBuf.Len())
if w.turboLines != nil {
- data := make([]byte, w.writeBuf.Len())
- copy(data, w.writeBuf.Bytes())
-
- // Force send the data
- w.turboLines <- encodeGeneratedBytes(w.generation, data)
- w.writeBuf.Reset()
+ if err := w.sendToTurboChannel(); err != nil {
+ return err
+ }
dlog.Server.Trace("TurboNetworkWriter.Flush", "flushed data to channel")
}
}
@@ -464,15 +467,22 @@ func (w *TurboNetworkWriter) Flush() error {
if w.turboLines != nil {
// Wait until channel has been drained somewhat
for i := 0; i < 100 && len(w.turboLines) > 900; i++ {
+ if !shouldWriteGeneration(w.generation, w.activeGeneration) {
+ return nil
+ }
dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", len(w.turboLines))
- time.Sleep(10 * time.Millisecond)
+ if !waitForGenerationRetry(w.generation, w.activeGeneration, 10*time.Millisecond) {
+ return nil
+ }
}
dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", len(w.turboLines))
}
// Wait a bit to ensure data is processed
// This is crucial for integration tests
- time.Sleep(10 * time.Millisecond)
+ if !waitForGenerationRetry(w.generation, w.activeGeneration, 10*time.Millisecond) {
+ return nil
+ }
dlog.Server.Trace("TurboNetworkWriter.Flush", "completed")
return nil
@@ -491,6 +501,21 @@ func shouldWriteGeneration(generation uint64, activeGeneration func() uint64) bo
return currentGeneration == generation
}
+func waitForGenerationRetry(generation uint64, activeGeneration func() uint64, delay time.Duration) bool {
+ if !shouldWriteGeneration(generation, activeGeneration) {
+ return false
+ }
+ if delay <= 0 {
+ return shouldWriteGeneration(generation, activeGeneration)
+ }
+
+ timer := time.NewTimer(delay)
+ defer timer.Stop()
+ <-timer.C
+
+ return shouldWriteGeneration(generation, activeGeneration)
+}
+
// DirectLineProcessor processes lines directly without channels in turbo mode
type DirectLineProcessor struct {
writer TurboWriter
diff --git a/internal/server/handlers/turbo_writer_test.go b/internal/server/handlers/turbo_writer_test.go
index d8dc8a9..23a07d4 100644
--- a/internal/server/handlers/turbo_writer_test.go
+++ b/internal/server/handlers/turbo_writer_test.go
@@ -3,8 +3,11 @@ package handlers
import (
"bytes"
"strings"
+ "sync/atomic"
"testing"
+ "time"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -353,6 +356,47 @@ func TestTurboChannelWriter_Stats(t *testing.T) {
}
}
+func TestTurboNetworkWriterStopsWaitingWhenGenerationBecomesStale(t *testing.T) {
+ originalLogger := dlog.Server
+ dlog.Server = &dlog.DLog{}
+ t.Cleanup(func() {
+ dlog.Server = originalLogger
+ })
+
+ turboLines := make(chan []byte, 1)
+ turboLines <- []byte("occupied")
+
+ var activeGeneration atomic.Uint64
+ activeGeneration.Store(1)
+
+ writer := &TurboNetworkWriter{
+ turboLines: turboLines,
+ generation: 1,
+ activeGeneration: func() uint64 {
+ return activeGeneration.Load()
+ },
+ }
+
+ go func() {
+ time.Sleep(10 * time.Millisecond)
+ activeGeneration.Store(2)
+ }()
+
+ done := make(chan error, 1)
+ go func() {
+ done <- writer.WriteLineData([]byte("stale line"), 1, "app.log")
+ }()
+
+ select {
+ case err := <-done:
+ if err != nil {
+ t.Fatalf("WriteLineData returned unexpected error: %v", err)
+ }
+ case <-time.After(150 * time.Millisecond):
+ t.Fatal("WriteLineData did not stop after the generation became stale")
+ }
+}
+
// TestDirectLineProcessor tests the line processor wrapper
// Note: Skipped because DirectLineProcessor uses dlog.Server which requires initialization
func TestDirectLineProcessor(t *testing.T) {