diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-19 22:59:48 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-19 22:59:48 +0200 |
| commit | 4593c3a2d86dc6cc34fa8546406ca5660436cb5d (patch) | |
| tree | d6e4cea9f26eb212123d1e2aa8859cbda1131aba | |
| parent | 6fcade7191d5d427bd9f6d7a9a6958456ac1de57 (diff) | |
task 213: harden interactive query session transitions
| -rw-r--r-- | integrationtests/interactive_runtime_query_test.go | 37 | ||||
| -rw-r--r-- | internal/clients/connectors/serverconnection_test.go | 148 | ||||
| -rw-r--r-- | internal/clients/connectors/sessiontransport.go | 6 |
3 files changed, 184 insertions, 7 deletions
diff --git a/integrationtests/interactive_runtime_query_test.go b/integrationtests/interactive_runtime_query_test.go index 48e2301..b338065 100644 --- a/integrationtests/interactive_runtime_query_test.go +++ b/integrationtests/interactive_runtime_query_test.go @@ -58,10 +58,14 @@ func TestDTailInteractiveReloadReusesSessionAndDropsLateOldMatches(t *testing.T) writerDone := make(chan error, 1) go func() { + if err := waitForCollectorSubstring(ctx, serverLogs, "Start reading|"+followFile+"|"+followFile); err != nil { + writerDone <- err + return + } writerDone <- appendLinesOnSchedule(ctx, followFile, []interactiveStep{ - {Delay: 200 * time.Millisecond, Input: "ERROR initial"}, - {Delay: 1200 * time.Millisecond, Input: "ERROR late"}, - {Delay: 1400 * time.Millisecond, Input: "WARN live"}, + {Delay: 100 * time.Millisecond, Input: "ERROR initial"}, + {Delay: 3000 * time.Millisecond, Input: "ERROR late"}, + {Delay: 3200 * time.Millisecond, Input: "WARN live"}, }) }() @@ -77,8 +81,8 @@ func TestDTailInteractiveReloadReusesSessionAndDropsLateOldMatches(t *testing.T) "--trustAllHosts", "--interactive-query", }, []interactiveStep{ - {Delay: 700 * time.Millisecond, Input: ":reload --grep WARN"}, - {Delay: 1700 * time.Millisecond, Input: ":quit"}, + {Delay: 3 * time.Second, Input: ":reload --grep WARN"}, + {Delay: 6 * time.Second, Input: ":quit"}, }) if err != nil { t.Fatalf("run interactive dtail: %v\noutput:\n%s", err, clientOutput) @@ -151,8 +155,8 @@ func TestDGrepInteractiveReloadReusesSessionAfterCompletedRead(t *testing.T) { "--trustAllHosts", "--interactive-query", }, []interactiveStep{ - {Delay: 400 * time.Millisecond, Input: ":reload --grep WARN"}, - {Delay: 1000 * time.Millisecond, Input: ":quit"}, + {Delay: 3500 * time.Millisecond, Input: ":reload --grep WARN"}, + {Delay: 5500 * time.Millisecond, Input: ":quit"}, }) if err != nil { t.Fatalf("run interactive dgrep: %v\noutput:\n%s", err, clientOutput) @@ -324,3 +328,22 @@ func countSubstring(lines []string, needle string) int { } return count } + +func waitForCollectorSubstring(ctx context.Context, collector *processOutputCollector, needle string) error { + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + for { + for _, line := range collector.snapshot() { + if strings.Contains(line, needle) { + return nil + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} diff --git a/internal/clients/connectors/serverconnection_test.go b/internal/clients/connectors/serverconnection_test.go index 01fe4af..0cb15c2 100644 --- a/internal/clients/connectors/serverconnection_test.go +++ b/internal/clients/connectors/serverconnection_test.go @@ -5,6 +5,8 @@ import ( "errors" "os" "path/filepath" + "strings" + "sync" "testing" "time" @@ -363,6 +365,65 @@ func TestServerConnectionApplySessionSpecTimesOutWaitingForAck(t *testing.T) { } } +func TestApplySessionSpecSerializesConcurrentBootstrapAndReload(t *testing.T) { + resetClientLogger(t) + + handler := newBlockingSessionHandler() + state := &committedSessionState{} + + initialSpec := sessionspec.Spec{ + Mode: omode.TailClient, + Files: []string{"/var/log/app.log"}, + Regex: "ERROR", + } + reloadSpec := sessionspec.Spec{ + Mode: omode.TailClient, + Files: []string{"/var/log/app.log"}, + Regex: "WARN", + } + + initialErrCh := make(chan error, 1) + go func() { + initialErrCh <- dispatchInitialCommands("srv1", handler, nil, true, initialSpec, state) + }() + + firstCommand := <-handler.commandsCh + if !strings.HasPrefix(firstCommand, "SESSION START ") { + t.Fatalf("expected initial SESSION START command, got %q", firstCommand) + } + + reloadErrCh := make(chan error, 1) + go func() { + reloadErrCh <- applySessionSpec("srv1", handler, state, reloadSpec, 50*time.Millisecond) + }() + + select { + case command := <-handler.commandsCh: + t.Fatalf("unexpected concurrent session command before bootstrap ack: %q", command) + case <-time.After(10 * time.Millisecond): + } + + handler.ackCh <- handlers.SessionAck{Action: "start", Generation: 1} + if err := <-initialErrCh; err != nil { + t.Fatalf("dispatchInitialCommands() error = %v", err) + } + + secondCommand := <-handler.commandsCh + if !strings.HasPrefix(secondCommand, "SESSION UPDATE 2 ") { + t.Fatalf("expected reload to send SESSION UPDATE after bootstrap, got %q", secondCommand) + } + + handler.ackCh <- handlers.SessionAck{Action: "update", Generation: 2} + if err := <-reloadErrCh; err != nil { + t.Fatalf("applySessionSpec() error = %v", err) + } + + committedSpec, generation, ok := state.snapshot() + if !ok || generation != 2 || committedSpec.Regex != "WARN" { + t.Fatalf("unexpected committed session after reload: spec=%#v generation=%d ok=%v", committedSpec, generation, ok) + } +} + type testSSHSettings struct { port int timeout time.Duration @@ -464,3 +525,90 @@ func (m *mockHandler) Read(_ []byte) (int, error) { func (m *mockHandler) Write(p []byte) (int, error) { return len(p), nil } + +type blockingSessionHandler struct { + mu sync.Mutex + commands []string + commandsCh chan string + ackCh chan handlers.SessionAck + capabilities map[string]bool +} + +func newBlockingSessionHandler() *blockingSessionHandler { + return &blockingSessionHandler{ + commandsCh: make(chan string, 8), + ackCh: make(chan handlers.SessionAck, 8), + capabilities: map[string]bool{ + protocol.CapabilityQueryUpdateV1: true, + }, + } +} + +var _ handlers.Handler = (*blockingSessionHandler)(nil) + +func (h *blockingSessionHandler) SendMessage(command string) error { + h.mu.Lock() + h.commands = append(h.commands, command) + h.mu.Unlock() + h.commandsCh <- command + return nil +} + +func (h *blockingSessionHandler) Capabilities() []string { + capabilities := make([]string, 0, len(h.capabilities)) + for capability := range h.capabilities { + capabilities = append(capabilities, capability) + } + return capabilities +} + +func (h *blockingSessionHandler) HasCapability(name string) bool { + return h.capabilities[name] +} + +func (*blockingSessionHandler) Server() string { + return "mock" +} + +func (*blockingSessionHandler) Status() int { + return 0 +} + +func (*blockingSessionHandler) Shutdown() {} + +func (*blockingSessionHandler) Done() <-chan struct{} { + return make(chan struct{}) +} + +func (*blockingSessionHandler) WaitForCapabilities(time.Duration) bool { + return true +} + +func (h *blockingSessionHandler) WaitForSessionAck(timeout time.Duration) (handlers.SessionAck, bool) { + if timeout <= 0 { + select { + case ack := <-h.ackCh: + return ack, true + default: + return handlers.SessionAck{}, false + } + } + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case ack := <-h.ackCh: + return ack, true + case <-timer.C: + return handlers.SessionAck{}, false + } +} + +func (*blockingSessionHandler) Read(_ []byte) (int, error) { + return 0, nil +} + +func (*blockingSessionHandler) Write(p []byte) (int, error) { + return len(p), nil +} diff --git a/internal/clients/connectors/sessiontransport.go b/internal/clients/connectors/sessiontransport.go index 84aeb78..428752a 100644 --- a/internal/clients/connectors/sessiontransport.go +++ b/internal/clients/connectors/sessiontransport.go @@ -27,6 +27,7 @@ var ( const defaultSessionAckTimeout = 2 * time.Second type committedSessionState struct { + applyMu sync.Mutex mu sync.RWMutex committed bool generation uint64 @@ -79,6 +80,11 @@ func dispatchInitialCommands(server string, handler handlers.Handler, commands [ func applySessionSpec(server string, handler handlers.Handler, state *committedSessionState, spec sessionspec.Spec, timeout time.Duration) error { + // Serialize session transitions so an interactive reload cannot race the + // initial SESSION START bootstrap on the same connection. + state.applyMu.Lock() + defer state.applyMu.Unlock() + if handler == nil { return ErrSessionUnsupported } |
