diff options
| -rw-r--r-- | internal/server/handlers/readcommand.go | 41 | ||||
| -rw-r--r-- | internal/ssh/client/authmethods_test.go | 4 |
2 files changed, 34 insertions, 11 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index dc3196e..f99c740 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -129,14 +129,14 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, dlog.Server.Info(r.server.LogContext(), "All files processed", "count", len(paths)) - // In turbo mode, only the final active command should signal EOF and wait for - // acknowledgement. Signaling per command in high-concurrency cat/grep sessions - // causes repeated EOF timeouts and races with still-running commands. + // In turbo mode, signal EOF once all pending file work is drained. + // Active command count may still include side-effect commands (for example AUTHKEY), + // so relying on "active == 1" can skip EOF signaling and lead to dropped output. if !r.server.TurboBoostDisabled() && !r.server.HasRegularAggregate() && (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) { if r.server.IsTurboMode() && r.server.HasTurboEOF() { pending, active := r.server.PendingAndActive() - shouldSignalEOF := pending == 0 && active == 1 + shouldSignalEOF := pending == 0 if !shouldSignalEOF { dlog.Server.Trace(r.server.LogContext(), "Skipping turbo EOF signal for non-final command", "pending", pending, "active", active) @@ -244,11 +244,25 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, // MapReduce now has a turbo mode implementation that bypasses channels dlog.Server.Debug(r.server.LogContext(), "Checking turbo mode", "turboBoostDisable", r.server.TurboBoostDisabled(), "mode", r.mode, "hasTurboAggregate", r.server.TurboAggregate() != nil, "hasAggregate", r.server.HasRegularAggregate()) - // Only use turbo mode if: - // 1. Turbo boost is NOT disabled (it's enabled by default) AND - // 2. We have a turbo aggregate OR (we're in cat/grep/tail mode AND we don't have a regular aggregate) - if !r.server.TurboBoostDisabled() && - (r.server.TurboAggregate() != nil || ((r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) && !r.server.HasRegularAggregate())) { + useTurboMode := false + if !r.server.TurboBoostDisabled() { + if r.server.TurboAggregate() != nil { + // Keep turbo mode for MapReduce aggregate flow. + useTurboMode = true + } else if (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) && !r.server.HasRegularAggregate() { + // Shared turbo direct-output flow is fragile under many concurrent commands. + // Fall back to channel mode when session command concurrency is high. + _, activeCommands := r.server.PendingAndActive() + if activeCommands <= 1 { + useTurboMode = true + } else { + dlog.Server.Info(r.server.LogContext(), "Skipping turbo mode for concurrent direct read commands", + "mode", r.mode, "activeCommands", activeCommands) + } + } + } + + if useTurboMode { dlog.Server.Info(r.server.LogContext(), "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.TurboAggregate() != nil) r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader) return @@ -350,9 +364,14 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri } func (r *readCommand) ensureTurboModeEnabled() { - if !r.server.IsTurboMode() { - r.server.EnableTurboMode() + // turboManager.enable() toggles mode and initializes shared channels. + // Under high concurrency a goroutine can observe mode=true before channel + // initialization is visible and create a writer with a nil turbo channel. + // Require both mode and channel to be ready before proceeding. + if r.server.IsTurboMode() && r.server.GetTurboChannel() != nil { + return } + r.server.EnableTurboMode() } func (r *readCommand) makeTurboWriter() TurboWriter { diff --git a/internal/ssh/client/authmethods_test.go b/internal/ssh/client/authmethods_test.go index e1e92b0..c6715fa 100644 --- a/internal/ssh/client/authmethods_test.go +++ b/internal/ssh/client/authmethods_test.go @@ -49,6 +49,8 @@ func (s *mockSigner) Sign(_ io.Reader, _ []byte) (*gossh.Signature, error) { func TestCollectKnownHostsAuthMethodsOrder(t *testing.T) { homeDir := "/tmp/dtail-auth-order" t.Setenv("HOME", homeDir) + // Keep this unit test deterministic regardless of integration-mode env. + t.Setenv("DTAIL_INTEGRATION_TEST_RUN_MODE", "") originalPrivateKeySigner := privateKeySigner originalAgentSigners := agentSigners @@ -107,6 +109,8 @@ func TestCollectKnownHostsAuthMethodsOrder(t *testing.T) { func TestCollectKnownHostsAuthMethodsSkipsDuplicateDefaultPath(t *testing.T) { homeDir := "/tmp/dtail-auth-dedupe" t.Setenv("HOME", homeDir) + // Keep this unit test deterministic regardless of integration-mode env. + t.Setenv("DTAIL_INTEGRATION_TEST_RUN_MODE", "") originalPrivateKeySigner := privateKeySigner originalAgentSigners := agentSigners |
