summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 10:05:47 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 10:05:47 +0200
commit6c3bc11f736040a09fd839832a6be01e434e8aab (patch)
tree6b856c2f79d2f75ccd8ba89c638ee18839b4d061 /internal/server/handlers/readcommand.go
parenta5a405d79fe3d9e0c6ea081b425d36bd67d8671d (diff)
Stop stale query work promptly on generation cancel
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go27
1 files changed, 22 insertions, 5 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 9c85889..9677718 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -9,6 +9,7 @@ import (
"sync"
"time"
+ "github.com/mimecast/dtail/internal/ctxutil"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
@@ -88,7 +89,9 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
paths, err := filepath.Glob(glob)
if err != nil {
dlog.Server.Warn(r.server.LogContext(), glob, err)
- time.Sleep(retryInterval)
+ if !ctxutil.Sleep(ctx, retryInterval) {
+ return
+ }
continue
}
@@ -101,7 +104,9 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
return
default:
}
- time.Sleep(retryInterval)
+ if !ctxutil.Sleep(ctx, retryInterval) {
+ return
+ }
continue
}
@@ -132,6 +137,12 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
dlog.Server.Info(r.server.LogContext(), "All files processed", "count", len(paths))
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
// In turbo mode, signal EOF once all pending file work is drained.
// Active command count may still include side-effect commands (for example AUTHKEY),
// so relying on "active == 1" can skip EOF signaling and lead to dropped output.
@@ -160,7 +171,9 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
if r.server.WaitForTurboEOFAck(timeout) {
dlog.Server.Debug(r.server.LogContext(), "Turbo EOF acknowledged")
// Allow transport buffers to flush after acknowledgement.
- time.Sleep(r.server.ShutdownTurboSerializeWait())
+ if !ctxutil.Sleep(ctx, r.server.ShutdownTurboSerializeWait()) {
+ return
+ }
} else {
dlog.Server.Warn(
r.server.LogContext(),
@@ -305,7 +318,9 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext
}
}
- time.Sleep(r.server.ReadRetryInterval())
+ if !ctxutil.Sleep(ctx, r.server.ReadRetryInterval()) {
+ return
+ }
dlog.Server.Info(path, globID, "Reading file again")
}
}
@@ -362,7 +377,9 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri
// Skip this delay in serverless mode since data is written directly to stdout
if !r.server.Serverless() {
dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> waiting for data transmission")
- time.Sleep(r.server.TurboDataTransmissionDelay())
+ if !ctxutil.Sleep(ctx, r.server.TurboDataTransmissionDelay()) {
+ return startErr
+ }
}
return startErr