diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-03 15:38:07 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-03 15:38:07 +0200 |
| commit | 56abee605f02f6975d68f094a13eca2890c31380 (patch) | |
| tree | ae08ad18a90f2a056baedfb54945ca0e38d8a96f /internal/server/handlers/readcommand.go | |
| parent | 46d4917ea0eaa587e87602200fb6843776cc62a5 (diff) | |
Stabilize integration-mode auth tests and concurrent dcat reads
Diffstat (limited to 'internal/server/handlers/readcommand.go')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 41 |
1 files changed, 30 insertions, 11 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index dc3196e..f99c740 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -129,14 +129,14 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, dlog.Server.Info(r.server.LogContext(), "All files processed", "count", len(paths)) - // In turbo mode, only the final active command should signal EOF and wait for - // acknowledgement. Signaling per command in high-concurrency cat/grep sessions - // causes repeated EOF timeouts and races with still-running commands. + // In turbo mode, signal EOF once all pending file work is drained. + // Active command count may still include side-effect commands (for example AUTHKEY), + // so relying on "active == 1" can skip EOF signaling and lead to dropped output. if !r.server.TurboBoostDisabled() && !r.server.HasRegularAggregate() && (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) { if r.server.IsTurboMode() && r.server.HasTurboEOF() { pending, active := r.server.PendingAndActive() - shouldSignalEOF := pending == 0 && active == 1 + shouldSignalEOF := pending == 0 if !shouldSignalEOF { dlog.Server.Trace(r.server.LogContext(), "Skipping turbo EOF signal for non-final command", "pending", pending, "active", active) @@ -244,11 +244,25 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, // MapReduce now has a turbo mode implementation that bypasses channels dlog.Server.Debug(r.server.LogContext(), "Checking turbo mode", "turboBoostDisable", r.server.TurboBoostDisabled(), "mode", r.mode, "hasTurboAggregate", r.server.TurboAggregate() != nil, "hasAggregate", r.server.HasRegularAggregate()) - // Only use turbo mode if: - // 1. Turbo boost is NOT disabled (it's enabled by default) AND - // 2. We have a turbo aggregate OR (we're in cat/grep/tail mode AND we don't have a regular aggregate) - if !r.server.TurboBoostDisabled() && - (r.server.TurboAggregate() != nil || ((r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) && !r.server.HasRegularAggregate())) { + useTurboMode := false + if !r.server.TurboBoostDisabled() { + if r.server.TurboAggregate() != nil { + // Keep turbo mode for MapReduce aggregate flow. + useTurboMode = true + } else if (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) && !r.server.HasRegularAggregate() { + // Shared turbo direct-output flow is fragile under many concurrent commands. + // Fall back to channel mode when session command concurrency is high. + _, activeCommands := r.server.PendingAndActive() + if activeCommands <= 1 { + useTurboMode = true + } else { + dlog.Server.Info(r.server.LogContext(), "Skipping turbo mode for concurrent direct read commands", + "mode", r.mode, "activeCommands", activeCommands) + } + } + } + + if useTurboMode { dlog.Server.Info(r.server.LogContext(), "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.TurboAggregate() != nil) r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader) return @@ -350,9 +364,14 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri } func (r *readCommand) ensureTurboModeEnabled() { - if !r.server.IsTurboMode() { - r.server.EnableTurboMode() + // turboManager.enable() toggles mode and initializes shared channels. + // Under high concurrency a goroutine can observe mode=true before channel + // initialization is visible and create a writer with a nil turbo channel. + // Require both mode and channel to be ready before proceeding. + if r.server.IsTurboMode() && r.server.GetTurboChannel() != nil { + return } + r.server.EnableTurboMode() } func (r *readCommand) makeTurboWriter() TurboWriter { |
