summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-03 15:38:07 +0200
committerPaul Buetow <paul@buetow.org>2026-03-03 15:38:07 +0200
commit56abee605f02f6975d68f094a13eca2890c31380 (patch)
treeae08ad18a90f2a056baedfb54945ca0e38d8a96f /internal/server/handlers/readcommand.go
parent46d4917ea0eaa587e87602200fb6843776cc62a5 (diff)
Stabilize integration-mode auth tests and concurrent dcat reads
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go41
1 files changed, 30 insertions, 11 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index dc3196e..f99c740 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -129,14 +129,14 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
dlog.Server.Info(r.server.LogContext(), "All files processed", "count", len(paths))
- // In turbo mode, only the final active command should signal EOF and wait for
- // acknowledgement. Signaling per command in high-concurrency cat/grep sessions
- // causes repeated EOF timeouts and races with still-running commands.
+ // In turbo mode, signal EOF once all pending file work is drained.
+ // Active command count may still include side-effect commands (for example AUTHKEY),
+ // so relying on "active == 1" can skip EOF signaling and lead to dropped output.
if !r.server.TurboBoostDisabled() && !r.server.HasRegularAggregate() &&
(r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
if r.server.IsTurboMode() && r.server.HasTurboEOF() {
pending, active := r.server.PendingAndActive()
- shouldSignalEOF := pending == 0 && active == 1
+ shouldSignalEOF := pending == 0
if !shouldSignalEOF {
dlog.Server.Trace(r.server.LogContext(), "Skipping turbo EOF signal for non-final command",
"pending", pending, "active", active)
@@ -244,11 +244,25 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
// MapReduce now has a turbo mode implementation that bypasses channels
dlog.Server.Debug(r.server.LogContext(), "Checking turbo mode", "turboBoostDisable", r.server.TurboBoostDisabled(),
"mode", r.mode, "hasTurboAggregate", r.server.TurboAggregate() != nil, "hasAggregate", r.server.HasRegularAggregate())
- // Only use turbo mode if:
- // 1. Turbo boost is NOT disabled (it's enabled by default) AND
- // 2. We have a turbo aggregate OR (we're in cat/grep/tail mode AND we don't have a regular aggregate)
- if !r.server.TurboBoostDisabled() &&
- (r.server.TurboAggregate() != nil || ((r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) && !r.server.HasRegularAggregate())) {
+ useTurboMode := false
+ if !r.server.TurboBoostDisabled() {
+ if r.server.TurboAggregate() != nil {
+ // Keep turbo mode for MapReduce aggregate flow.
+ useTurboMode = true
+ } else if (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) && !r.server.HasRegularAggregate() {
+ // Shared turbo direct-output flow is fragile under many concurrent commands.
+ // Fall back to channel mode when session command concurrency is high.
+ _, activeCommands := r.server.PendingAndActive()
+ if activeCommands <= 1 {
+ useTurboMode = true
+ } else {
+ dlog.Server.Info(r.server.LogContext(), "Skipping turbo mode for concurrent direct read commands",
+ "mode", r.mode, "activeCommands", activeCommands)
+ }
+ }
+ }
+
+ if useTurboMode {
dlog.Server.Info(r.server.LogContext(), "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.TurboAggregate() != nil)
r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader)
return
@@ -350,9 +364,14 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri
}
func (r *readCommand) ensureTurboModeEnabled() {
- if !r.server.IsTurboMode() {
- r.server.EnableTurboMode()
+ // turboManager.enable() toggles mode and initializes shared channels.
+ // Under high concurrency a goroutine can observe mode=true before channel
+ // initialization is visible and create a writer with a nil turbo channel.
+ // Require both mode and channel to be ready before proceeding.
+ if r.server.IsTurboMode() && r.server.GetTurboChannel() != nil {
+ return
}
+ r.server.EnableTurboMode()
}
func (r *readCommand) makeTurboWriter() TurboWriter {