diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 10:05:47 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 10:05:47 +0200 |
| commit | 6c3bc11f736040a09fd839832a6be01e434e8aab (patch) | |
| tree | 6b856c2f79d2f75ccd8ba89c638ee18839b4d061 | |
| parent | a5a405d79fe3d9e0c6ea081b425d36bd67d8671d (diff) | |
Stop stale query work promptly on generation cancel
| -rw-r--r-- | internal/ctxutil/sleep.go | 29 | ||||
| -rw-r--r-- | internal/ctxutil/sleep_test.go | 32 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 5 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor.go | 5 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor_optimized.go | 11 | ||||
| -rw-r--r-- | internal/mapr/server/turbo_aggregate.go | 45 | ||||
| -rw-r--r-- | internal/mapr/server/turbo_aggregate_test.go | 19 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 27 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand_cancellation_test.go | 69 | ||||
| -rw-r--r-- | internal/server/handlers/sessioncommand.go | 2 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_writer.go | 69 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_writer_test.go | 44 |
12 files changed, 322 insertions, 35 deletions
diff --git a/internal/ctxutil/sleep.go b/internal/ctxutil/sleep.go new file mode 100644 index 0000000..6965e8d --- /dev/null +++ b/internal/ctxutil/sleep.go @@ -0,0 +1,29 @@ +package ctxutil + +import ( + "context" + "time" +) + +// Sleep waits for the delay or exits early when the context is canceled. +// It returns true when the full delay elapsed. +func Sleep(ctx context.Context, delay time.Duration) bool { + if delay <= 0 { + select { + case <-ctx.Done(): + return false + default: + return true + } + } + + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + return false + case <-timer.C: + return true + } +} diff --git a/internal/ctxutil/sleep_test.go b/internal/ctxutil/sleep_test.go new file mode 100644 index 0000000..f32b22c --- /dev/null +++ b/internal/ctxutil/sleep_test.go @@ -0,0 +1,32 @@ +package ctxutil + +import ( + "context" + "testing" + "time" +) + +func TestSleepReturnsEarlyOnCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + start := time.Now() + if Sleep(ctx, time.Second) { + t.Fatal("Sleep should stop when the context is canceled") + } + if elapsed := time.Since(start); elapsed > 50*time.Millisecond { + t.Fatalf("Sleep took too long to return after cancellation: %v", elapsed) + } +} + +func TestSleepWaitsForDelay(t *testing.T) { + ctx := context.Background() + + start := time.Now() + if !Sleep(ctx, 20*time.Millisecond) { + t.Fatal("Sleep should report success when the delay elapses") + } + if elapsed := time.Since(start); elapsed < 15*time.Millisecond { + t.Fatalf("Sleep returned too early: %v", elapsed) + } +} diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 47a999d..0ec2eca 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/mimecast/dtail/internal/ctxutil" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" @@ -206,7 +207,9 @@ func (f *readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader, if abortReading == status { return err } - time.Sleep(time.Millisecond * 100) + if !ctxutil.Sleep(ctx, 100*time.Millisecond) { + return nil + } continue } diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go index 8f56bdd..672d6d8 100644 --- a/internal/io/fs/readfile_processor.go +++ b/internal/io/fs/readfile_processor.go @@ -8,6 +8,7 @@ import ( "os" "time" + "github.com/mimecast/dtail/internal/ctxutil" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" @@ -77,7 +78,9 @@ func (f *readFile) readWithProcessor(ctx context.Context, fd *os.File, reader *b if abortReading == status { return err } - time.Sleep(time.Millisecond * 100) + if !ctxutil.Sleep(ctx, 100*time.Millisecond) { + return nil + } continue } diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go index 2e880e7..1e553ee 100644 --- a/internal/io/fs/readfile_processor_optimized.go +++ b/internal/io/fs/readfile_processor_optimized.go @@ -8,6 +8,7 @@ import ( "os" "time" + "github.com/mimecast/dtail/internal/ctxutil" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" @@ -346,6 +347,8 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, return err } + waitForMoreData := true + // EOF handling select { case <-ctx.Done(): @@ -354,8 +357,12 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, if isTruncated, err := f.truncated(fd); isTruncated { return err } - case <-time.After(100 * time.Millisecond): - // Continue reading after a short delay + waitForMoreData = false + default: + } + + if waitForMoreData && !ctxutil.Sleep(ctx, 100*time.Millisecond) { + return nil } } diff --git a/internal/mapr/server/turbo_aggregate.go b/internal/mapr/server/turbo_aggregate.go index 9b5afe7..188be1c 100644 --- a/internal/mapr/server/turbo_aggregate.go +++ b/internal/mapr/server/turbo_aggregate.go @@ -56,6 +56,21 @@ type rawLine struct { sourceID string } +func (a *TurboAggregate) stopping() bool { + select { + case <-a.done.Done(): + return true + default: + return false + } +} + +func (a *TurboAggregate) stopSerializeTicker() { + if a.serializeTicker != nil { + a.serializeTicker.Stop() + } +} + // NewTurboAggregate returns a new turbo mode aggregator. func NewTurboAggregate(queryStr string, defaultLogFormat string) (*TurboAggregate, error) { query, err := mapr.NewQuery(queryStr) @@ -130,9 +145,7 @@ func (a *TurboAggregate) Shutdown() { a.done.Shutdown() // Stop the ticker - if a.serializeTicker != nil { - a.serializeTicker.Stop() - } + a.stopSerializeTicker() // Wait for active processors to finish for a.activeProcessors.Load() > 0 { @@ -185,6 +198,19 @@ func (a *TurboAggregate) Shutdown() { } } +// Abort stops background processing without waiting for final serialization. +// Session generation replacement uses this to preempt old query work immediately. +func (a *TurboAggregate) Abort() { + dlog.Server.Info("TurboAggregate: Abort called", + "linesProcessed", a.linesProcessed.Load(), + "filesProcessed", a.filesProcessed.Load(), + "activeProcessors", a.activeProcessors.Load(), + "currentGroups", a.countGroups()) + + a.done.Shutdown() + a.stopSerializeTicker() +} + // Start the turbo aggregation. func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) { a.maprMessages = maprMessages @@ -206,6 +232,10 @@ func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) // ProcessLineDirect processes a line directly without channels. // This is called from the TurboAggregateProcessor. func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string) error { + if a.stopping() { + return nil + } + // Increment counter first a.linesProcessed.Add(1) @@ -642,6 +672,11 @@ func NewTurboAggregateProcessor(aggregate *TurboAggregate, globID string) *Turbo // ProcessLine processes a line directly to the turbo aggregate. func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error { + if p.aggregate.stopping() { + pool.RecycleBytesBuffer(lineContent) + return nil + } + // Debug: Log when ProcessLine is called if lineNum == 1 || lineNum%1000 == 0 { dlog.Server.Info("TurboAggregateProcessor: ProcessLine called", @@ -661,6 +696,10 @@ func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum // Flush ensures all buffered data is processed. func (p *TurboAggregateProcessor) Flush() error { + if p.aggregate.stopping() { + return nil + } + // Log flush call for debugging dlog.Server.Info("TurboAggregateProcessor: Flush called", "globID", p.globID, diff --git a/internal/mapr/server/turbo_aggregate_test.go b/internal/mapr/server/turbo_aggregate_test.go index f556f50..7ae4b5a 100644 --- a/internal/mapr/server/turbo_aggregate_test.go +++ b/internal/mapr/server/turbo_aggregate_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" @@ -311,3 +312,21 @@ func TestTurboAggregateConcurrency(t *testing.T) { t.Error("Did not find expected count of 1000 in results") } } + +func TestTurboAggregateAbortReturnsPromptlyWithActiveProcessors(t *testing.T) { + aggregate := &TurboAggregate{} + aggregate.done = internal.NewDone() + aggregate.activeProcessors.Store(1) + + done := make(chan struct{}) + go func() { + aggregate.Abort() + close(done) + }() + + select { + case <-done: + case <-time.After(100 * time.Millisecond): + t.Fatal("Abort did not return promptly while processors were still active") + } +} diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 9c85889..9677718 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/mimecast/dtail/internal/ctxutil" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" @@ -88,7 +89,9 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, paths, err := filepath.Glob(glob) if err != nil { dlog.Server.Warn(r.server.LogContext(), glob, err) - time.Sleep(retryInterval) + if !ctxutil.Sleep(ctx, retryInterval) { + return + } continue } @@ -101,7 +104,9 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, return default: } - time.Sleep(retryInterval) + if !ctxutil.Sleep(ctx, retryInterval) { + return + } continue } @@ -132,6 +137,12 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, dlog.Server.Info(r.server.LogContext(), "All files processed", "count", len(paths)) + select { + case <-ctx.Done(): + return + default: + } + // In turbo mode, signal EOF once all pending file work is drained. // Active command count may still include side-effect commands (for example AUTHKEY), // so relying on "active == 1" can skip EOF signaling and lead to dropped output. @@ -160,7 +171,9 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, if r.server.WaitForTurboEOFAck(timeout) { dlog.Server.Debug(r.server.LogContext(), "Turbo EOF acknowledged") // Allow transport buffers to flush after acknowledgement. - time.Sleep(r.server.ShutdownTurboSerializeWait()) + if !ctxutil.Sleep(ctx, r.server.ShutdownTurboSerializeWait()) { + return + } } else { dlog.Server.Warn( r.server.LogContext(), @@ -305,7 +318,9 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext } } - time.Sleep(r.server.ReadRetryInterval()) + if !ctxutil.Sleep(ctx, r.server.ReadRetryInterval()) { + return + } dlog.Server.Info(path, globID, "Reading file again") } } @@ -362,7 +377,9 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri // Skip this delay in serverless mode since data is written directly to stdout if !r.server.Serverless() { dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> waiting for data transmission") - time.Sleep(r.server.TurboDataTransmissionDelay()) + if !ctxutil.Sleep(ctx, r.server.TurboDataTransmissionDelay()) { + return startErr + } } return startErr diff --git a/internal/server/handlers/readcommand_cancellation_test.go b/internal/server/handlers/readcommand_cancellation_test.go new file mode 100644 index 0000000..4b77224 --- /dev/null +++ b/internal/server/handlers/readcommand_cancellation_test.go @@ -0,0 +1,69 @@ +package handlers + +import ( + "context" + "testing" + "time" + + "github.com/mimecast/dtail/internal/io/fs" + "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" + "github.com/mimecast/dtail/internal/omode" + "github.com/mimecast/dtail/internal/regex" +) + +type retryOnlyFileReader struct{} + +func (retryOnlyFileReader) Start(context.Context, lcontext.LContext, chan<- *line.Line, regex.Regex) error { + return nil +} + +func (retryOnlyFileReader) StartWithProcessor(context.Context, lcontext.LContext, line.Processor, regex.Regex) error { + return nil +} + +func (retryOnlyFileReader) StartWithProcessorOptimized(context.Context, lcontext.LContext, line.Processor, regex.Regex) error { + return nil +} + +func (retryOnlyFileReader) FilePath() string { + return "" +} + +func (retryOnlyFileReader) Retry() bool { + return true +} + +var _ fs.FileReader = retryOnlyFileReader{} + +func TestExecuteReadLoopStopsPromptlyWhenContextCanceledDuringRetrySleep(t *testing.T) { + handler := newSessionTestHandler("readcommand-cancel-user") + handler.serverCfg.ReadRetryIntervalMs = 1000 + + command := newReadCommand(handler, omode.TailClient) + reader := retryOnlyFileReader{} + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + strategyCalls := 0 + + go func() { + command.executeReadLoop(ctx, lcontext.LContext{}, "/var/log/app.log", "app.log", regex.NewNoop(), reader, + func(context.Context, lcontext.LContext, fs.FileReader, regex.Regex) error { + strategyCalls++ + cancel() + return nil + }) + close(done) + }() + + select { + case <-done: + case <-time.After(150 * time.Millisecond): + t.Fatal("executeReadLoop did not stop promptly after cancellation") + } + + if strategyCalls != 1 { + t.Fatalf("expected one read attempt before cancellation, got %d", strategyCalls) + } +} diff --git a/internal/server/handlers/sessioncommand.go b/internal/server/handlers/sessioncommand.go index 25b8d15..76c4109 100644 --- a/internal/server/handlers/sessioncommand.go +++ b/internal/server/handlers/sessioncommand.go @@ -253,7 +253,7 @@ func (h *ServerHandler) resetSessionAggregates() { h.aggregate = nil } if h.turboAggregate != nil { - h.turboAggregate.Shutdown() + h.turboAggregate.Abort() h.turboAggregate = nil } } diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go index fa12f72..f09a2af 100644 --- a/internal/server/handlers/turbo_writer.go +++ b/internal/server/handlers/turbo_writer.go @@ -399,23 +399,29 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error { data := make([]byte, w.writeBuf.Len()) copy(data, w.writeBuf.Bytes()) + encoded := encodeGeneratedBytes(w.generation, data) dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sending to turboLines channel", "dataLen", len(data)) - // Send data to turbo channel, retry once if full - select { - case w.turboLines <- encodeGeneratedBytes(w.generation, data): - dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully") - w.writeBuf.Reset() - return nil - default: - // Channel full, wait a bit and retry - dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry") - time.Sleep(time.Millisecond) - w.turboLines <- encodeGeneratedBytes(w.generation, data) - dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel after retry") - w.writeBuf.Reset() - return nil + for { + if !shouldWriteGeneration(w.generation, w.activeGeneration) { + dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "generation became stale while waiting to send") + w.writeBuf.Reset() + return nil + } + + select { + case w.turboLines <- encoded: + dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully") + w.writeBuf.Reset() + return nil + default: + dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry") + if !waitForGenerationRetry(w.generation, w.activeGeneration, time.Millisecond) { + w.writeBuf.Reset() + return nil + } + } } } @@ -449,12 +455,9 @@ func (w *TurboNetworkWriter) Flush() error { dlog.Server.Trace("TurboNetworkWriter.Flush", "flushing buffered data", "bufSize", w.writeBuf.Len()) if w.turboLines != nil { - data := make([]byte, w.writeBuf.Len()) - copy(data, w.writeBuf.Bytes()) - - // Force send the data - w.turboLines <- encodeGeneratedBytes(w.generation, data) - w.writeBuf.Reset() + if err := w.sendToTurboChannel(); err != nil { + return err + } dlog.Server.Trace("TurboNetworkWriter.Flush", "flushed data to channel") } } @@ -464,15 +467,22 @@ func (w *TurboNetworkWriter) Flush() error { if w.turboLines != nil { // Wait until channel has been drained somewhat for i := 0; i < 100 && len(w.turboLines) > 900; i++ { + if !shouldWriteGeneration(w.generation, w.activeGeneration) { + return nil + } dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", len(w.turboLines)) - time.Sleep(10 * time.Millisecond) + if !waitForGenerationRetry(w.generation, w.activeGeneration, 10*time.Millisecond) { + return nil + } } dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", len(w.turboLines)) } // Wait a bit to ensure data is processed // This is crucial for integration tests - time.Sleep(10 * time.Millisecond) + if !waitForGenerationRetry(w.generation, w.activeGeneration, 10*time.Millisecond) { + return nil + } dlog.Server.Trace("TurboNetworkWriter.Flush", "completed") return nil @@ -491,6 +501,21 @@ func shouldWriteGeneration(generation uint64, activeGeneration func() uint64) bo return currentGeneration == generation } +func waitForGenerationRetry(generation uint64, activeGeneration func() uint64, delay time.Duration) bool { + if !shouldWriteGeneration(generation, activeGeneration) { + return false + } + if delay <= 0 { + return shouldWriteGeneration(generation, activeGeneration) + } + + timer := time.NewTimer(delay) + defer timer.Stop() + <-timer.C + + return shouldWriteGeneration(generation, activeGeneration) +} + // DirectLineProcessor processes lines directly without channels in turbo mode type DirectLineProcessor struct { writer TurboWriter diff --git a/internal/server/handlers/turbo_writer_test.go b/internal/server/handlers/turbo_writer_test.go index d8dc8a9..23a07d4 100644 --- a/internal/server/handlers/turbo_writer_test.go +++ b/internal/server/handlers/turbo_writer_test.go @@ -3,8 +3,11 @@ package handlers import ( "bytes" "strings" + "sync/atomic" "testing" + "time" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/protocol" ) @@ -353,6 +356,47 @@ func TestTurboChannelWriter_Stats(t *testing.T) { } } +func TestTurboNetworkWriterStopsWaitingWhenGenerationBecomesStale(t *testing.T) { + originalLogger := dlog.Server + dlog.Server = &dlog.DLog{} + t.Cleanup(func() { + dlog.Server = originalLogger + }) + + turboLines := make(chan []byte, 1) + turboLines <- []byte("occupied") + + var activeGeneration atomic.Uint64 + activeGeneration.Store(1) + + writer := &TurboNetworkWriter{ + turboLines: turboLines, + generation: 1, + activeGeneration: func() uint64 { + return activeGeneration.Load() + }, + } + + go func() { + time.Sleep(10 * time.Millisecond) + activeGeneration.Store(2) + }() + + done := make(chan error, 1) + go func() { + done <- writer.WriteLineData([]byte("stale line"), 1, "app.log") + }() + + select { + case err := <-done: + if err != nil { + t.Fatalf("WriteLineData returned unexpected error: %v", err) + } + case <-time.After(150 * time.Millisecond): + t.Fatal("WriteLineData did not stop after the generation became stale") + } +} + // TestDirectLineProcessor tests the line processor wrapper // Note: Skipped because DirectLineProcessor uses dlog.Server which requires initialization func TestDirectLineProcessor(t *testing.T) { |
