diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 10:29:24 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 10:29:24 +0200 |
| commit | 29e50d7b6ebb9e6c59d079ef5b7551b1acd950fb (patch) | |
| tree | 147ae88ee00c6b170d1f28a55c89fb4c92fc440f /internal | |
| parent | 8c08e4e60219782e50c3a5f20a051e706196f48c (diff) | |
config: make server timing and buffer knobs configurable
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/clients/connectors/serverconnection.go | 7 | ||||
| -rw-r--r-- | internal/config/common.go | 3 | ||||
| -rw-r--r-- | internal/config/server.go | 41 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 17 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand_server.go | 70 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 1 | ||||
| -rw-r--r-- | internal/server/handlers/shutdown_coordinator.go | 4 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_manager.go | 70 |
8 files changed, 193 insertions, 20 deletions
diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index d114d06..3c29ac0 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -41,6 +41,11 @@ func NewServerConnection(server string, userName string, handler handlers.Handler, commands []string) *ServerConnection { dlog.Client.Debug(server, "Creating new connection", server, handler, commands) + sshConnectTimeout := time.Duration(config.Common.SSHConnectTimeoutMs) * time.Millisecond + if sshConnectTimeout <= 0 { + sshConnectTimeout = 2 * time.Second + } + c := ServerConnection{ hostKeyCallback: hostKeyCallback, server: server, @@ -50,7 +55,7 @@ func NewServerConnection(server string, userName string, User: userName, Auth: authMethods, HostKeyCallback: hostKeyCallback.Wrap(), - Timeout: time.Second * 2, + Timeout: sshConnectTimeout, }, } diff --git a/internal/config/common.go b/internal/config/common.go index 7a72cfe..4e90f7e 100644 --- a/internal/config/common.go +++ b/internal/config/common.go @@ -4,6 +4,8 @@ package config type CommonConfig struct { // The SSH port number SSHPort int + // SSH connection timeout in milliseconds. + SSHConnectTimeoutMs int `json:",omitempty"` // Enable experimental features (mainly for dev purposes) ExperimentalFeaturesEnable bool `json:",omitempty"` // LogDir defines the log directory. @@ -22,6 +24,7 @@ type CommonConfig struct { func newDefaultCommonConfig() *CommonConfig { return &CommonConfig{ SSHPort: DefaultSSHPort, + SSHConnectTimeoutMs: 2000, ExperimentalFeaturesEnable: false, LogDir: "log", Logger: "stdout", diff --git a/internal/config/server.go b/internal/config/server.go index efa7335..6d25965 100644 --- a/internal/config/server.go +++ b/internal/config/server.go @@ -72,6 +72,32 @@ type ServerConfig struct { // better performance through direct writing that bypasses internal channels. // Set this to true only if you experience issues with turbo boost mode. TurboBoostDisable bool `json:",omitempty"` + // Retry interval for glob retries in milliseconds. + ReadGlobRetryIntervalMs int `json:",omitempty"` + // Retry interval for re-reading in tail/cat loops in milliseconds. + ReadRetryIntervalMs int `json:",omitempty"` + // Buffer size used for aggregate read channels. + ReadAggregateLineBufferSize int `json:",omitempty"` + // Delay after turbo processor flush/close to allow data transmission, in milliseconds. + TurboTransmissionDelayMs int `json:",omitempty"` + // Turbo EOF wait base duration in milliseconds. + TurboEOFWaitBaseMs int `json:",omitempty"` + // Turbo EOF wait per-file duration in milliseconds. + TurboEOFWaitPerFileMs int `json:",omitempty"` + // Maximum turbo EOF wait duration in milliseconds. + TurboEOFWaitMaxMs int `json:",omitempty"` + // Turbo channel buffer size. + TurboChannelBufferSize int `json:",omitempty"` + // Turbo channel flush timeout in milliseconds. + TurboFlushTimeoutMs int `json:",omitempty"` + // Turbo channel flush poll interval in milliseconds. + TurboFlushPollIntervalMs int `json:",omitempty"` + // Turbo read retry interval in milliseconds when data is expected but not yet available. + TurboReadRetryIntervalMs int `json:",omitempty"` + // Wait for turbo aggregate serialization during shutdown in milliseconds. + ShutdownTurboSerializeWaitMs int `json:",omitempty"` + // Final idle recheck wait before shutdown in milliseconds. + ShutdownIdleRecheckWaitMs int `json:",omitempty"` } // Create a new default server configuration. @@ -90,7 +116,20 @@ func newDefaultServerConfig() *ServerConfig { Permissions: Permissions{ Default: defaultPermissions, }, - TurboBoostDisable: false, // Default to false, meaning turbo boost is enabled by default + TurboBoostDisable: false, // Default to false, meaning turbo boost is enabled by default + ReadGlobRetryIntervalMs: 5000, + ReadRetryIntervalMs: 2000, + ReadAggregateLineBufferSize: 10000, + TurboTransmissionDelayMs: 50, + TurboEOFWaitBaseMs: 500, + TurboEOFWaitPerFileMs: 10, + TurboEOFWaitMaxMs: 2000, + TurboChannelBufferSize: 1000, + TurboFlushTimeoutMs: 2000, + TurboFlushPollIntervalMs: 10, + TurboReadRetryIntervalMs: 1, + ShutdownTurboSerializeWaitMs: 500, + ShutdownIdleRecheckWaitMs: 10, } } diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index dd49f5d..ad2b87f 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -78,7 +78,7 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, glob string, re regex.Regex, retries int) { - retryInterval := time.Second * 5 + retryInterval := r.server.ReadGlobRetryInterval() glob = filepath.Clean(glob) for retryCount := 0; retryCount < retries; retryCount++ { @@ -146,14 +146,7 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, // This is especially important when files are queued due to concurrency limits // In serverless mode, data is written directly to stdout, so no wait is needed if !r.server.Serverless() { - waitTime := 500 * time.Millisecond - if len(paths) > 10 { - // For many files, wait proportionally longer - waitTime = time.Duration(len(paths)*10) * time.Millisecond - if waitTime > 2*time.Second { - waitTime = 2 * time.Second - } - } + waitTime := r.server.TurboEOFWaitDuration(len(paths)) dlog.Server.Debug(r.server.LogContext(), "Waiting for data transmission", "duration", waitTime) time.Sleep(waitTime) } @@ -276,7 +269,7 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext } } - time.Sleep(time.Second * 2) + time.Sleep(r.server.ReadRetryInterval()) dlog.Server.Info(path, globID, "Reading file again") } } @@ -288,7 +281,7 @@ func (r *readCommand) readViaChannels() readStrategy { if r.server.HasRegularAggregate() { // For MapReduce operations, create a new channel that goes only to the aggregate. - linesCh = make(chan *line.Line, 10000) + linesCh = make(chan *line.Line, r.server.AggregateLinesChannelBufferSize()) r.server.RegisterAggregateLines(linesCh) closeLines = true } else { @@ -332,7 +325,7 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri // Skip this delay in serverless mode since data is written directly to stdout if !r.server.Serverless() { dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> waiting for data transmission") - time.Sleep(50 * time.Millisecond) + time.Sleep(r.server.TurboDataTransmissionDelay()) } return startErr diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go index 5160c5c..46f61a4 100644 --- a/internal/server/handlers/readcommand_server.go +++ b/internal/server/handlers/readcommand_server.go @@ -2,6 +2,7 @@ package handlers import ( "sync/atomic" + "time" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/mapr/server" @@ -32,6 +33,13 @@ type readCommandServer interface { FlushTurboData() SignalTurboEOF() GetTurboChannel() chan []byte + ReadGlobRetryInterval() time.Duration + ReadRetryInterval() time.Duration + AggregateLinesChannelBufferSize() int + TurboDataTransmissionDelay() time.Duration + TurboEOFWaitDuration(fileCount int) time.Duration + ShutdownTurboSerializeWait() time.Duration + ShutdownIdleRecheckWait() time.Duration } var _ readCommandServer = (*ServerHandler)(nil) @@ -117,3 +125,65 @@ func (h *ServerHandler) TriggerShutdown() { func (h *ServerHandler) FlushTurboData() { h.flushTurboData() } + +func durationFromMilliseconds(value int, fallback time.Duration) time.Duration { + if value <= 0 { + return fallback + } + return time.Duration(value) * time.Millisecond +} + +func positiveIntOrDefault(value int, fallback int) int { + if value <= 0 { + return fallback + } + return value +} + +func (h *ServerHandler) ReadGlobRetryInterval() time.Duration { + return durationFromMilliseconds(h.serverCfg.ReadGlobRetryIntervalMs, 5*time.Second) +} + +func (h *ServerHandler) ReadRetryInterval() time.Duration { + return durationFromMilliseconds(h.serverCfg.ReadRetryIntervalMs, 2*time.Second) +} + +func (h *ServerHandler) AggregateLinesChannelBufferSize() int { + return positiveIntOrDefault(h.serverCfg.ReadAggregateLineBufferSize, 10000) +} + +func (h *ServerHandler) TurboDataTransmissionDelay() time.Duration { + return durationFromMilliseconds(h.serverCfg.TurboTransmissionDelayMs, 50*time.Millisecond) +} + +func (h *ServerHandler) TurboEOFWaitDuration(fileCount int) time.Duration { + baseWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitBaseMs, 500*time.Millisecond) + if fileCount <= 10 { + return baseWait + } + + perFileWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitPerFileMs, 10*time.Millisecond) + maxWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitMaxMs, 2*time.Second) + wait := time.Duration(fileCount) * perFileWait + if wait > maxWait { + return maxWait + } + return wait +} + +func (h *ServerHandler) ShutdownTurboSerializeWait() time.Duration { + return durationFromMilliseconds(h.serverCfg.ShutdownTurboSerializeWaitMs, 500*time.Millisecond) +} + +func (h *ServerHandler) ShutdownIdleRecheckWait() time.Duration { + return durationFromMilliseconds(h.serverCfg.ShutdownIdleRecheckWaitMs, 10*time.Millisecond) +} + +func (h *ServerHandler) turboManagerConfig() turboManagerConfig { + return turboManagerConfig{ + channelBufferSize: positiveIntOrDefault(h.serverCfg.TurboChannelBufferSize, defaultTurboChannelBufferSize), + flushTimeout: durationFromMilliseconds(h.serverCfg.TurboFlushTimeoutMs, defaultTurboFlushTimeout), + flushPollInterval: durationFromMilliseconds(h.serverCfg.TurboFlushPollIntervalMs, defaultTurboFlushPollInterval), + readRetryInterval: durationFromMilliseconds(h.serverCfg.TurboReadRetryIntervalMs, defaultTurboReadRetryInterval), + } +} diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index c562c9a..f9aa499 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -58,6 +58,7 @@ func NewServerHandler(user *user.User, catLimiter, } h.handleCommandCb = h.handleUserCommand h.commands = h.newCommandRegistry() + h.turbo.configure(h.turboManagerConfig()) fqdn, err := config.Hostname() if err != nil { diff --git a/internal/server/handlers/shutdown_coordinator.go b/internal/server/handlers/shutdown_coordinator.go index 6d3eda7..973cc61 100644 --- a/internal/server/handlers/shutdown_coordinator.go +++ b/internal/server/handlers/shutdown_coordinator.go @@ -33,13 +33,13 @@ func (c *shutdownCoordinator) finalizeWhenIdle() { turboAggregate.Serialize(context.Background()) // In serverless mode, serialization is synchronous, so no wait needed. if !c.server.Serverless() { - time.Sleep(500 * time.Millisecond) + time.Sleep(c.server.ShutdownTurboSerializeWait()) } } // Double-check that we really have no pending work before shutdown. if !c.server.Serverless() { - time.Sleep(10 * time.Millisecond) + time.Sleep(c.server.ShutdownIdleRecheckWait()) } finalPending, finalActive := c.server.PendingAndActive() if finalPending == 0 && finalActive == 0 { diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go index 7fad042..9d36359 100644 --- a/internal/server/handlers/turbo_manager.go +++ b/internal/server/handlers/turbo_manager.go @@ -7,17 +7,79 @@ import ( user "github.com/mimecast/dtail/internal/user/server" ) +const ( + defaultTurboChannelBufferSize = 1000 + defaultTurboFlushTimeout = 2 * time.Second + defaultTurboFlushPollInterval = 10 * time.Millisecond + defaultTurboReadRetryInterval = time.Millisecond +) + +type turboManagerConfig struct { + channelBufferSize int + flushTimeout time.Duration + flushPollInterval time.Duration + readRetryInterval time.Duration +} + type turboManager struct { mode bool lines chan []byte buffer []byte eof chan struct{} + + channelBufferSize int + flushTimeout time.Duration + flushPollInterval time.Duration + readRetryInterval time.Duration +} + +func (t *turboManager) configure(cfg turboManagerConfig) { + if cfg.channelBufferSize > 0 { + t.channelBufferSize = cfg.channelBufferSize + } + if cfg.flushTimeout > 0 { + t.flushTimeout = cfg.flushTimeout + } + if cfg.flushPollInterval > 0 { + t.flushPollInterval = cfg.flushPollInterval + } + if cfg.readRetryInterval > 0 { + t.readRetryInterval = cfg.readRetryInterval + } +} + +func (t *turboManager) resolvedChannelBufferSize() int { + if t.channelBufferSize > 0 { + return t.channelBufferSize + } + return defaultTurboChannelBufferSize +} + +func (t *turboManager) resolvedFlushTimeout() time.Duration { + if t.flushTimeout > 0 { + return t.flushTimeout + } + return defaultTurboFlushTimeout +} + +func (t *turboManager) resolvedFlushPollInterval() time.Duration { + if t.flushPollInterval > 0 { + return t.flushPollInterval + } + return defaultTurboFlushPollInterval +} + +func (t *turboManager) resolvedReadRetryInterval() time.Duration { + if t.readRetryInterval > 0 { + return t.readRetryInterval + } + return defaultTurboReadRetryInterval } func (t *turboManager) enable() { t.mode = true if t.lines == nil { - t.lines = make(chan []byte, 1000) // Large buffer for performance + t.lines = make(chan []byte, t.resolvedChannelBufferSize()) } // Always create a new EOF channel for each batch of files. t.eof = make(chan struct{}) @@ -62,7 +124,7 @@ func (t *turboManager) flush(user *user.User) { dlog.Server.Debug(user, "Flushing turbo data", "channelLen", len(t.lines)) - timeout := time.After(2 * time.Second) + timeout := time.After(t.resolvedFlushTimeout()) for { select { case <-timeout: @@ -74,7 +136,7 @@ func (t *turboManager) flush(user *user.User) { return } // Give the reader time to process. - time.Sleep(10 * time.Millisecond) + time.Sleep(t.resolvedFlushPollInterval()) } } } @@ -113,7 +175,7 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) default: if channelLen > 0 { dlog.Server.Trace(user, "baseHandler.Read", "channel has data but not available, waiting") - time.Sleep(time.Millisecond) + time.Sleep(t.resolvedReadRetryInterval()) select { case turboData := <-t.lines: dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData)) |
