From ec1504e0cedbfeffc35e50a09633e51e93bf0e2d Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Thu, 5 Mar 2026 19:09:27 +0200 Subject: more on this --- integrationtests/authkey_test.go | 40 +++++++++++++++++++++++++++---- internal/config/initializer.go | 1 + internal/server/handlers/basehandler.go | 17 +++++++++++++ internal/server/handlers/readcommand.go | 2 ++ internal/ssh/client/knownhostscallback.go | 7 ++++-- 5 files changed, 61 insertions(+), 6 deletions(-) diff --git a/integrationtests/authkey_test.go b/integrationtests/authkey_test.go index 40e9ad7..13f0069 100644 --- a/integrationtests/authkey_test.go +++ b/integrationtests/authkey_test.go @@ -92,10 +92,10 @@ func testAuthKeyTTLExpiry(t *testing.T) { t.Fatalf("Expected second connection to succeed, exit=%d err=%v", exitCode, err) } assertDCatSuccessfulOutput(t, "authkey_ttl_2.tmp") - waitForServerLogs() - fastPathCountAfterSecond := server.CountLogLinesContaining(authKeyFastPathLog) + fastPathCountAfterSecond := waitForLogCountAtLeast(server, authKeyFastPathLog, 1, 5*time.Second) if fastPathCountAfterSecond < 1 { - t.Fatalf("Expected fast-path hit before TTL expiry, count=%d", fastPathCountAfterSecond) + t.Fatalf("Expected fast-path hit before TTL expiry, count=%d\nserver logs:\n%s", + fastPathCountAfterSecond, strings.Join(server.LogLines(), "\n")) } time.Sleep(time.Duration(ttlSeconds+1) * time.Second) @@ -195,6 +195,10 @@ func (s *authKeyServer) CountLogLinesContaining(substring string) int { return s.logs.countContaining(substring) } +func (s *authKeyServer) LogLines() []string { + return s.logs.snapshot() +} + type authKeyServerLogs struct { mu sync.Mutex lines []string @@ -225,6 +229,15 @@ func (l *authKeyServerLogs) countContaining(substring string) int { return count } +func (l *authKeyServerLogs) snapshot() []string { + l.mu.Lock() + defer l.mu.Unlock() + + lines := make([]string, len(l.lines)) + copy(lines, l.lines) + return lines +} + func startAuthKeyServer(t *testing.T, cfgFile string) *authKeyServer { t.Helper() @@ -241,7 +254,8 @@ func startAuthKeyServer(t *testing.T, cfgFile string) *authKeyServer { args = append(args, "--cfg", cfgFile) } - stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, "", "../dserver", args...) + stdoutCh, stderrCh, cmdErrCh, err := startCommandWithEnv(ctx, t, "", "../dserver", + map[string]string{"DTAIL_TURBOBOOST_DISABLE": "yes"}, args...) if err != nil { cancel() t.Fatalf("Unable to start dserver: %v", err) @@ -361,3 +375,21 @@ func createAuthKeyPair(t *testing.T, keyName string) string { func waitForServerLogs() { time.Sleep(300 * time.Millisecond) } + +func waitForLogCountAtLeast(server *authKeyServer, substring string, minCount int, timeout time.Duration) int { + if minCount <= 0 { + return server.CountLogLinesContaining(substring) + } + + deadline := time.Now().Add(timeout) + for { + count := server.CountLogLinesContaining(substring) + if count >= minCount { + return count + } + if time.Now().After(deadline) { + return count + } + time.Sleep(100 * time.Millisecond) + } +} diff --git a/internal/config/initializer.go b/internal/config/initializer.go index 6038705..eaf7216 100644 --- a/internal/config/initializer.go +++ b/internal/config/initializer.go @@ -92,6 +92,7 @@ func (in *initializer) processEnvVars(args *Args) { if Env("DTAIL_INTEGRATION_TEST_RUN_MODE") { os.Setenv("DTAIL_HOSTNAME_OVERRIDE", "integrationtest") in.Server.MaxLineLength = 1024 + in.Server.TurboBoostDisable = true } sshPrivateKeyPathFile := os.Getenv("DTAIL_SSH_PRIVATE_KEYFILE_PATH") if len(sshPrivateKeyPathFile) > 0 && args.SSHPrivateKeyFilePath == "" { diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index d510139..030baf9 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -77,6 +77,13 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { return n, nil } + pollInterval := time.Second + if h.turbo.enabled() { + // Turbo reads require tighter wake-ups so we can continue draining the turbo channel. + pollInterval = h.turbo.resolvedReadRetryInterval() + } + poll := time.After(pollInterval) + select { case message := <-h.serverMessages: if len(message) > 0 && message[0] == '.' { @@ -131,6 +138,16 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { case <-h.done.Done(): err = io.EOF return + + case <-poll: + // Wake periodically so turbo mode transitions don't leave this read blocked forever. + select { + case <-h.done.Done(): + err = io.EOF + return + default: + } + return } return } diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index f99c740..c03900f 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -372,6 +372,8 @@ func (r *readCommand) ensureTurboModeEnabled() { return } r.server.EnableTurboMode() + // Wake a potentially blocked reader goroutine so it can switch to turbo drain path. + r.server.SendServerMessage(".turbo wake") } func (r *readCommand) makeTurboWriter() TurboWriter { diff --git a/internal/ssh/client/knownhostscallback.go b/internal/ssh/client/knownhostscallback.go index 26ab245..45451ea 100644 --- a/internal/ssh/client/knownhostscallback.go +++ b/internal/ssh/client/knownhostscallback.go @@ -95,7 +95,9 @@ func (c *KnownHostsCallback) Wrap() ssh.HostKeyCallback { ipLine: knownhosts.Line([]string{remote.String()}, key), responseCh: make(chan response), } - dlog.Client.Warn("Encountered unknown host", unknown) + // Keep host trust discovery diagnostics out of normal command output. + // In trust-all and plain modes this warning can corrupt tool output. + dlog.Client.Debug("Encountered unknown host", unknown.server, unknown.remote.String()) // Notify user that there is an unknown host c.unknownCh <- unknown // Wait for user input. @@ -148,7 +150,8 @@ func (c *KnownHostsCallback) promptAddHosts(hosts []unknownHost) { select { case <-c.trustAllHostsCh: - dlog.Client.Warn("Trusting host keys of servers", servers) + // Trust-all mode is non-interactive; avoid warning-level noise on stdout. + dlog.Client.Debug("Trusting host keys of servers", servers) c.trustHosts(hosts) return default: -- cgit v1.2.3