summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 08:04:29 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 08:04:29 +0200
commit8ac1d12f8261bce508c99be454ce27df9c1af0a9 (patch)
treefafc2a1b2f384e0c9bc4ae0c4779e1371bb46ec9
parent6dbc03d5c7b6068665e2d95bc66c4f3700323dc8 (diff)
task 399: add capability fallback helper
-rw-r--r--internal/clients/connectors/connector.go4
-rw-r--r--internal/clients/connectors/serverconnection.go24
-rw-r--r--internal/clients/connectors/serverconnection_test.go60
-rw-r--r--internal/clients/connectors/serverless.go7
4 files changed, 90 insertions, 5 deletions
diff --git a/internal/clients/connectors/connector.go b/internal/clients/connectors/connector.go
index 3ab6a08..a803c33 100644
--- a/internal/clients/connectors/connector.go
+++ b/internal/clients/connectors/connector.go
@@ -2,6 +2,7 @@ package connectors
import (
"context"
+ "time"
"github.com/mimecast/dtail/internal/clients/handlers"
)
@@ -14,4 +15,7 @@ type Connector interface {
Server() string
// Handler for the connection.
Handler() handlers.Handler
+ // SupportsQueryUpdates reports whether the connected server advertised
+ // runtime query replacement support within the given timeout.
+ SupportsQueryUpdates(timeout time.Duration) bool
}
diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go
index 1136bf9..97c02eb 100644
--- a/internal/clients/connectors/serverconnection.go
+++ b/internal/clients/connectors/serverconnection.go
@@ -13,6 +13,7 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/protocol"
"github.com/mimecast/dtail/internal/ssh/client"
"golang.org/x/crypto/ssh"
@@ -27,6 +28,7 @@ type SSHSettings interface {
const (
defaultSSHConnectTimeout = 2 * time.Second
defaultSSHPort = 2222
+ defaultCapabilityWait = 250 * time.Millisecond
)
// ServerConnection represents a connection to a single remote dtail server via
@@ -94,6 +96,13 @@ func (c *ServerConnection) Server() string { return c.server }
// Handler returns the handler used for the connection.
func (c *ServerConnection) Handler() handlers.Handler { return c.handler }
+// SupportsQueryUpdates reports whether the remote server advertised the
+// runtime query replacement capability. Older servers simply time out and
+// return false here without affecting the legacy command path.
+func (c *ServerConnection) SupportsQueryUpdates(timeout time.Duration) bool {
+ return supportsQueryUpdates(c.handler, timeout)
+}
+
// Attempt to parse the server port address from the provided server FQDN.
func (c *ServerConnection) initServerPort(defaultPort int) {
parts := strings.Split(c.server, ":")
@@ -319,3 +328,18 @@ func extractAuthKeyBase64(authKeyPubBytes []byte) (string, error) {
return "", fmt.Errorf("no public key found")
}
+
+func supportsQueryUpdates(handler handlers.Handler, timeout time.Duration) bool {
+ if handler == nil {
+ return false
+ }
+
+ if timeout <= 0 {
+ timeout = defaultCapabilityWait
+ }
+ if !handler.WaitForCapabilities(timeout) {
+ return false
+ }
+
+ return handler.HasCapability(protocol.CapabilityQueryUpdateV1)
+}
diff --git a/internal/clients/connectors/serverconnection_test.go b/internal/clients/connectors/serverconnection_test.go
index be53cd4..9307b24 100644
--- a/internal/clients/connectors/serverconnection_test.go
+++ b/internal/clients/connectors/serverconnection_test.go
@@ -9,6 +9,7 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/protocol"
"golang.org/x/crypto/ssh"
)
@@ -129,6 +130,49 @@ func TestNewServerConnectionFallsBackToDefaults(t *testing.T) {
}
}
+func TestServerConnectionSupportsQueryUpdates(t *testing.T) {
+ resetClientLogger(t)
+
+ conn := &ServerConnection{
+ handler: &mockHandler{
+ waitForCapabilities: true,
+ capabilities: map[string]bool{
+ protocol.CapabilityQueryUpdateV1: true,
+ },
+ },
+ }
+
+ if !conn.SupportsQueryUpdates(10 * time.Millisecond) {
+ t.Fatalf("expected query-update capability to be detected")
+ }
+}
+
+func TestServerConnectionSupportsQueryUpdatesFallsBackForOlderServers(t *testing.T) {
+ resetClientLogger(t)
+
+ conn := &ServerConnection{
+ handler: &mockHandler{},
+ }
+
+ if conn.SupportsQueryUpdates(5 * time.Millisecond) {
+ t.Fatalf("expected old-server fallback when no capability is advertised")
+ }
+}
+
+func TestServerConnectionSupportsQueryUpdatesRequiresCapabilityFlag(t *testing.T) {
+ resetClientLogger(t)
+
+ conn := &ServerConnection{
+ handler: &mockHandler{
+ waitForCapabilities: true,
+ },
+ }
+
+ if conn.SupportsQueryUpdates(10 * time.Millisecond) {
+ t.Fatalf("expected capability wait success alone to be insufficient")
+ }
+}
+
type testSSHSettings struct {
port int
timeout time.Duration
@@ -165,7 +209,9 @@ func resetClientLogger(t *testing.T) {
}
type mockHandler struct {
- commands []string
+ commands []string
+ capabilities map[string]bool
+ waitForCapabilities bool
}
var _ handlers.Handler = (*mockHandler)(nil)
@@ -176,11 +222,15 @@ func (m *mockHandler) SendMessage(command string) error {
}
func (m *mockHandler) Capabilities() []string {
- return nil
+ var capabilities []string
+ for capability := range m.capabilities {
+ capabilities = append(capabilities, capability)
+ }
+ return capabilities
}
-func (m *mockHandler) HasCapability(string) bool {
- return false
+func (m *mockHandler) HasCapability(name string) bool {
+ return m.capabilities[name]
}
func (m *mockHandler) Server() string {
@@ -200,7 +250,7 @@ func (m *mockHandler) Done() <-chan struct{} {
}
func (m *mockHandler) WaitForCapabilities(timeout time.Duration) bool {
- return false
+ return m.waitForCapabilities
}
func (m *mockHandler) Read(_ []byte) (int, error) {
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
index daf825f..0ebe069 100644
--- a/internal/clients/connectors/serverless.go
+++ b/internal/clients/connectors/serverless.go
@@ -3,6 +3,7 @@ package connectors
import (
"context"
"io"
+ "time"
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/io/dlog"
@@ -47,6 +48,12 @@ func (s *Serverless) Handler() handlers.Handler {
return s.handler
}
+// SupportsQueryUpdates reports whether the in-process server advertised
+// runtime query update support to the client handler.
+func (s *Serverless) SupportsQueryUpdates(timeout time.Duration) bool {
+ return supportsQueryUpdates(s.handler, timeout)
+}
+
// Start the serverless connection.
func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc,
throttleCh, statsCh chan struct{}) {