diff options
| author | Paul Buetow <paul@buetow.org> | 2025-07-01 00:10:48 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-07-01 00:10:48 +0300 |
| commit | 3c0fa29aad92ee81e6662989714fee5046f5a6b9 (patch) | |
| tree | 8c21fc9839224c92d096a7761dc716a1f0fac58a /internal/server/handlers | |
| parent | a3f6bb625aad2cd4a0c86af44feaa22aa401331f (diff) | |
feat: ensure command doesn't complete until all pending files are processed
In turbo mode, prevent Start() from returning until all pending files
have been fully processed, not just queued. This prevents commandFinished()
from being called prematurely which could trigger shutdown while files
are still being processed due to concurrency limits.
This partially addresses the issue with TestDCat2 failing when
MaxConcurrentCats=2, though further investigation is needed for
complete resolution.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 28 |
1 files changed, 26 insertions, 2 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 2920044..44d5e99 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -63,6 +63,20 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, dlog.Server.Debug("Reading data from file(s)") r.readGlob(ctx, ltx, args[1], re, retries) + + // In turbo mode, ensure we don't return until all pending files are processed + // This prevents commandFinished() from being called too early + if config.Env("DTAIL_TURBOBOOST_ENABLE") && r.server.aggregate == nil { + for atomic.LoadInt32(&r.server.pendingFiles) > 0 { + dlog.Server.Debug(r.server.user, "Waiting for pending files to complete", "pending", atomic.LoadInt32(&r.server.pendingFiles)) + select { + case <-ctx.Done(): + return + case <-time.After(100 * time.Millisecond): + } + } + dlog.Server.Debug(r.server.user, "All pending files completed") + } } func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, @@ -167,9 +181,19 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC dlog.Server.Debug(r.server.user, "File processing complete", "path", path, "remainingPending", remaining) // Check if we should trigger shutdown now + // Only shutdown if no files are pending AND no commands are active if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 { - dlog.Server.Debug(r.server.user, "No active commands and no pending files, triggering shutdown") - r.server.shutdown() + // Double-check that we really have no pending work + // In turbo mode, there might be a race condition + time.Sleep(10 * time.Millisecond) + finalPending := atomic.LoadInt32(&r.server.pendingFiles) + finalActive := atomic.LoadInt32(&r.server.activeCommands) + if finalPending == 0 && finalActive == 0 { + dlog.Server.Debug(r.server.user, "No active commands and no pending files after double-check, triggering shutdown") + r.server.shutdown() + } else { + dlog.Server.Debug(r.server.user, "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive) + } } }() |
