diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 08:04:29 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 08:04:29 +0200 |
| commit | 8ac1d12f8261bce508c99be454ce27df9c1af0a9 (patch) | |
| tree | fafc2a1b2f384e0c9bc4ae0c4779e1371bb46ec9 | |
| parent | 6dbc03d5c7b6068665e2d95bc66c4f3700323dc8 (diff) | |
task 399: add capability fallback helper
| -rw-r--r-- | internal/clients/connectors/connector.go | 4 | ||||
| -rw-r--r-- | internal/clients/connectors/serverconnection.go | 24 | ||||
| -rw-r--r-- | internal/clients/connectors/serverconnection_test.go | 60 | ||||
| -rw-r--r-- | internal/clients/connectors/serverless.go | 7 |
4 files changed, 90 insertions, 5 deletions
diff --git a/internal/clients/connectors/connector.go b/internal/clients/connectors/connector.go index 3ab6a08..a803c33 100644 --- a/internal/clients/connectors/connector.go +++ b/internal/clients/connectors/connector.go @@ -2,6 +2,7 @@ package connectors import ( "context" + "time" "github.com/mimecast/dtail/internal/clients/handlers" ) @@ -14,4 +15,7 @@ type Connector interface { Server() string // Handler for the connection. Handler() handlers.Handler + // SupportsQueryUpdates reports whether the connected server advertised + // runtime query replacement support within the given timeout. + SupportsQueryUpdates(timeout time.Duration) bool } diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index 1136bf9..97c02eb 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -13,6 +13,7 @@ import ( "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/protocol" "github.com/mimecast/dtail/internal/ssh/client" "golang.org/x/crypto/ssh" @@ -27,6 +28,7 @@ type SSHSettings interface { const ( defaultSSHConnectTimeout = 2 * time.Second defaultSSHPort = 2222 + defaultCapabilityWait = 250 * time.Millisecond ) // ServerConnection represents a connection to a single remote dtail server via @@ -94,6 +96,13 @@ func (c *ServerConnection) Server() string { return c.server } // Handler returns the handler used for the connection. func (c *ServerConnection) Handler() handlers.Handler { return c.handler } +// SupportsQueryUpdates reports whether the remote server advertised the +// runtime query replacement capability. Older servers simply time out and +// return false here without affecting the legacy command path. +func (c *ServerConnection) SupportsQueryUpdates(timeout time.Duration) bool { + return supportsQueryUpdates(c.handler, timeout) +} + // Attempt to parse the server port address from the provided server FQDN. func (c *ServerConnection) initServerPort(defaultPort int) { parts := strings.Split(c.server, ":") @@ -319,3 +328,18 @@ func extractAuthKeyBase64(authKeyPubBytes []byte) (string, error) { return "", fmt.Errorf("no public key found") } + +func supportsQueryUpdates(handler handlers.Handler, timeout time.Duration) bool { + if handler == nil { + return false + } + + if timeout <= 0 { + timeout = defaultCapabilityWait + } + if !handler.WaitForCapabilities(timeout) { + return false + } + + return handler.HasCapability(protocol.CapabilityQueryUpdateV1) +} diff --git a/internal/clients/connectors/serverconnection_test.go b/internal/clients/connectors/serverconnection_test.go index be53cd4..9307b24 100644 --- a/internal/clients/connectors/serverconnection_test.go +++ b/internal/clients/connectors/serverconnection_test.go @@ -9,6 +9,7 @@ import ( "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/protocol" "golang.org/x/crypto/ssh" ) @@ -129,6 +130,49 @@ func TestNewServerConnectionFallsBackToDefaults(t *testing.T) { } } +func TestServerConnectionSupportsQueryUpdates(t *testing.T) { + resetClientLogger(t) + + conn := &ServerConnection{ + handler: &mockHandler{ + waitForCapabilities: true, + capabilities: map[string]bool{ + protocol.CapabilityQueryUpdateV1: true, + }, + }, + } + + if !conn.SupportsQueryUpdates(10 * time.Millisecond) { + t.Fatalf("expected query-update capability to be detected") + } +} + +func TestServerConnectionSupportsQueryUpdatesFallsBackForOlderServers(t *testing.T) { + resetClientLogger(t) + + conn := &ServerConnection{ + handler: &mockHandler{}, + } + + if conn.SupportsQueryUpdates(5 * time.Millisecond) { + t.Fatalf("expected old-server fallback when no capability is advertised") + } +} + +func TestServerConnectionSupportsQueryUpdatesRequiresCapabilityFlag(t *testing.T) { + resetClientLogger(t) + + conn := &ServerConnection{ + handler: &mockHandler{ + waitForCapabilities: true, + }, + } + + if conn.SupportsQueryUpdates(10 * time.Millisecond) { + t.Fatalf("expected capability wait success alone to be insufficient") + } +} + type testSSHSettings struct { port int timeout time.Duration @@ -165,7 +209,9 @@ func resetClientLogger(t *testing.T) { } type mockHandler struct { - commands []string + commands []string + capabilities map[string]bool + waitForCapabilities bool } var _ handlers.Handler = (*mockHandler)(nil) @@ -176,11 +222,15 @@ func (m *mockHandler) SendMessage(command string) error { } func (m *mockHandler) Capabilities() []string { - return nil + var capabilities []string + for capability := range m.capabilities { + capabilities = append(capabilities, capability) + } + return capabilities } -func (m *mockHandler) HasCapability(string) bool { - return false +func (m *mockHandler) HasCapability(name string) bool { + return m.capabilities[name] } func (m *mockHandler) Server() string { @@ -200,7 +250,7 @@ func (m *mockHandler) Done() <-chan struct{} { } func (m *mockHandler) WaitForCapabilities(timeout time.Duration) bool { - return false + return m.waitForCapabilities } func (m *mockHandler) Read(_ []byte) (int, error) { diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index daf825f..0ebe069 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -3,6 +3,7 @@ package connectors import ( "context" "io" + "time" "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/io/dlog" @@ -47,6 +48,12 @@ func (s *Serverless) Handler() handlers.Handler { return s.handler } +// SupportsQueryUpdates reports whether the in-process server advertised +// runtime query update support to the client handler. +func (s *Serverless) SupportsQueryUpdates(timeout time.Duration) bool { + return supportsQueryUpdates(s.handler, timeout) +} + // Start the serverless connection. func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { |
