summaryrefslogtreecommitdiff
path: root/internal/server/handlers
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-07-01 00:10:48 +0300
committerPaul Buetow <paul@buetow.org>2025-07-01 00:10:48 +0300
commit3c0fa29aad92ee81e6662989714fee5046f5a6b9 (patch)
tree8c21fc9839224c92d096a7761dc716a1f0fac58a /internal/server/handlers
parenta3f6bb625aad2cd4a0c86af44feaa22aa401331f (diff)
feat: ensure command doesn't complete until all pending files are processed
In turbo mode, prevent Start() from returning until all pending files have been fully processed, not just queued. This prevents commandFinished() from being called prematurely which could trigger shutdown while files are still being processed due to concurrency limits. This partially addresses the issue with TestDCat2 failing when MaxConcurrentCats=2, though further investigation is needed for complete resolution. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers')
-rw-r--r--internal/server/handlers/readcommand.go28
1 files changed, 26 insertions, 2 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 2920044..44d5e99 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -63,6 +63,20 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
dlog.Server.Debug("Reading data from file(s)")
r.readGlob(ctx, ltx, args[1], re, retries)
+
+ // In turbo mode, ensure we don't return until all pending files are processed
+ // This prevents commandFinished() from being called too early
+ if config.Env("DTAIL_TURBOBOOST_ENABLE") && r.server.aggregate == nil {
+ for atomic.LoadInt32(&r.server.pendingFiles) > 0 {
+ dlog.Server.Debug(r.server.user, "Waiting for pending files to complete", "pending", atomic.LoadInt32(&r.server.pendingFiles))
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(100 * time.Millisecond):
+ }
+ }
+ dlog.Server.Debug(r.server.user, "All pending files completed")
+ }
}
func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
@@ -167,9 +181,19 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
dlog.Server.Debug(r.server.user, "File processing complete", "path", path, "remainingPending", remaining)
// Check if we should trigger shutdown now
+ // Only shutdown if no files are pending AND no commands are active
if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 {
- dlog.Server.Debug(r.server.user, "No active commands and no pending files, triggering shutdown")
- r.server.shutdown()
+ // Double-check that we really have no pending work
+ // In turbo mode, there might be a race condition
+ time.Sleep(10 * time.Millisecond)
+ finalPending := atomic.LoadInt32(&r.server.pendingFiles)
+ finalActive := atomic.LoadInt32(&r.server.activeCommands)
+ if finalPending == 0 && finalActive == 0 {
+ dlog.Server.Debug(r.server.user, "No active commands and no pending files after double-check, triggering shutdown")
+ r.server.shutdown()
+ } else {
+ dlog.Server.Debug(r.server.user, "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive)
+ }
}
}()