diff options
| -rw-r--r-- | internal/config/server.go | 3 | ||||
| -rw-r--r-- | internal/server/handlers/basehandler.go | 5 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 18 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand_server.go | 7 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_manager.go | 37 |
5 files changed, 64 insertions, 6 deletions
diff --git a/internal/config/server.go b/internal/config/server.go index 6d25965..80b52e9 100644 --- a/internal/config/server.go +++ b/internal/config/server.go @@ -94,6 +94,8 @@ type ServerConfig struct { TurboFlushPollIntervalMs int `json:",omitempty"` // Turbo read retry interval in milliseconds when data is expected but not yet available. TurboReadRetryIntervalMs int `json:",omitempty"` + // Maximum time to wait for turbo EOF acknowledgement after signaling EOF, in milliseconds. + TurboEOFAckTimeoutMs int `json:",omitempty"` // Wait for turbo aggregate serialization during shutdown in milliseconds. ShutdownTurboSerializeWaitMs int `json:",omitempty"` // Final idle recheck wait before shutdown in milliseconds. @@ -128,6 +130,7 @@ func newDefaultServerConfig() *ServerConfig { TurboFlushTimeoutMs: 2000, TurboFlushPollIntervalMs: 10, TurboReadRetryIntervalMs: 1, + TurboEOFAckTimeoutMs: 2000, ShutdownTurboSerializeWaitMs: 500, ShutdownIdleRecheckWaitMs: 10, } diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index a61da48..f21262e 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -366,3 +366,8 @@ func (h *baseHandler) GetTurboChannel() chan []byte { func (h *baseHandler) TurboChannelLen() int { return h.turbo.channelLen() } + +// WaitForTurboEOFAck waits until turbo reader acknowledges EOF or timeout. +func (h *baseHandler) WaitForTurboEOFAck(timeout time.Duration) bool { + return h.turbo.waitForEOFAck(timeout) +} diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index ad2b87f..6078e37 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -142,13 +142,19 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, // Signal EOF by closing the channel, but only once. r.server.SignalTurboEOF() - // Wait to ensure all data is transmitted - // This is especially important when files are queued due to concurrency limits - // In serverless mode, data is written directly to stdout, so no wait is needed + // Wait for an explicit reader acknowledgement instead of timing guesses. if !r.server.Serverless() { - waitTime := r.server.TurboEOFWaitDuration(len(paths)) - dlog.Server.Debug(r.server.LogContext(), "Waiting for data transmission", "duration", waitTime) - time.Sleep(waitTime) + timeout := r.server.TurboEOFAckTimeout() + if r.server.WaitForTurboEOFAck(timeout) { + dlog.Server.Debug(r.server.LogContext(), "Turbo EOF acknowledged") + } else { + dlog.Server.Warn( + r.server.LogContext(), + "Timeout waiting for turbo EOF acknowledgement", + "timeout", timeout, + "remainingTurbo", r.server.TurboChannelLen(), + ) + } } } } diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go index 46f61a4..2c0a616 100644 --- a/internal/server/handlers/readcommand_server.go +++ b/internal/server/handlers/readcommand_server.go @@ -33,6 +33,8 @@ type readCommandServer interface { FlushTurboData() SignalTurboEOF() GetTurboChannel() chan []byte + TurboChannelLen() int + WaitForTurboEOFAck(timeout time.Duration) bool ReadGlobRetryInterval() time.Duration ReadRetryInterval() time.Duration AggregateLinesChannelBufferSize() int @@ -40,6 +42,7 @@ type readCommandServer interface { TurboEOFWaitDuration(fileCount int) time.Duration ShutdownTurboSerializeWait() time.Duration ShutdownIdleRecheckWait() time.Duration + TurboEOFAckTimeout() time.Duration } var _ readCommandServer = (*ServerHandler)(nil) @@ -126,6 +129,10 @@ func (h *ServerHandler) FlushTurboData() { h.flushTurboData() } +func (h *ServerHandler) TurboEOFAckTimeout() time.Duration { + return durationFromMilliseconds(h.serverCfg.TurboEOFAckTimeoutMs, 2*time.Second) +} + func durationFromMilliseconds(value int, fallback time.Duration) time.Duration { if value <= 0 { return fallback diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go index 9d36359..e5cbf1a 100644 --- a/internal/server/handlers/turbo_manager.go +++ b/internal/server/handlers/turbo_manager.go @@ -26,6 +26,7 @@ type turboManager struct { lines chan []byte buffer []byte eof chan struct{} + eofAck chan struct{} channelBufferSize int flushTimeout time.Duration @@ -83,6 +84,7 @@ func (t *turboManager) enable() { } // Always create a new EOF channel for each batch of files. t.eof = make(chan struct{}) + t.eofAck = make(chan struct{}) } func (t *turboManager) enabled() bool { @@ -106,6 +108,40 @@ func (t *turboManager) signalEOF() { } } +func (t *turboManager) signalEOFAck() { + if t.eofAck == nil { + return + } + + select { + case <-t.eofAck: + // Already closed. + default: + close(t.eofAck) + } +} + +func (t *turboManager) waitForEOFAck(timeout time.Duration) bool { + if t.eofAck == nil { + return true + } + + if timeout <= 0 { + <-t.eofAck + return true + } + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-t.eofAck: + return true + case <-timer.C: + return false + } +} + func (t *turboManager) channel() chan []byte { return t.lines } @@ -194,6 +230,7 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) case <-t.eof: dlog.Server.Trace(user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode") t.mode = false + t.signalEOFAck() default: } } |
