diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 10:05:47 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 10:05:47 +0200 |
| commit | 6c3bc11f736040a09fd839832a6be01e434e8aab (patch) | |
| tree | 6b856c2f79d2f75ccd8ba89c638ee18839b4d061 /internal/server/handlers/readcommand.go | |
| parent | a5a405d79fe3d9e0c6ea081b425d36bd67d8671d (diff) | |
Stop stale query work promptly on generation cancel
Diffstat (limited to 'internal/server/handlers/readcommand.go')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 9c85889..9677718 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/mimecast/dtail/internal/ctxutil" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" @@ -88,7 +89,9 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, paths, err := filepath.Glob(glob) if err != nil { dlog.Server.Warn(r.server.LogContext(), glob, err) - time.Sleep(retryInterval) + if !ctxutil.Sleep(ctx, retryInterval) { + return + } continue } @@ -101,7 +104,9 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, return default: } - time.Sleep(retryInterval) + if !ctxutil.Sleep(ctx, retryInterval) { + return + } continue } @@ -132,6 +137,12 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, dlog.Server.Info(r.server.LogContext(), "All files processed", "count", len(paths)) + select { + case <-ctx.Done(): + return + default: + } + // In turbo mode, signal EOF once all pending file work is drained. // Active command count may still include side-effect commands (for example AUTHKEY), // so relying on "active == 1" can skip EOF signaling and lead to dropped output. @@ -160,7 +171,9 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, if r.server.WaitForTurboEOFAck(timeout) { dlog.Server.Debug(r.server.LogContext(), "Turbo EOF acknowledged") // Allow transport buffers to flush after acknowledgement. - time.Sleep(r.server.ShutdownTurboSerializeWait()) + if !ctxutil.Sleep(ctx, r.server.ShutdownTurboSerializeWait()) { + return + } } else { dlog.Server.Warn( r.server.LogContext(), @@ -305,7 +318,9 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext } } - time.Sleep(r.server.ReadRetryInterval()) + if !ctxutil.Sleep(ctx, r.server.ReadRetryInterval()) { + return + } dlog.Server.Info(path, globID, "Reading file again") } } @@ -362,7 +377,9 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri // Skip this delay in serverless mode since data is written directly to stdout if !r.server.Serverless() { dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> waiting for data transmission") - time.Sleep(r.server.TurboDataTransmissionDelay()) + if !ctxutil.Sleep(ctx, r.server.TurboDataTransmissionDelay()) { + return startErr + } } return startErr |
