summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/config/server.go3
-rw-r--r--internal/server/handlers/basehandler.go5
-rw-r--r--internal/server/handlers/readcommand.go18
-rw-r--r--internal/server/handlers/readcommand_server.go7
-rw-r--r--internal/server/handlers/turbo_manager.go37
5 files changed, 64 insertions, 6 deletions
diff --git a/internal/config/server.go b/internal/config/server.go
index 6d25965..80b52e9 100644
--- a/internal/config/server.go
+++ b/internal/config/server.go
@@ -94,6 +94,8 @@ type ServerConfig struct {
TurboFlushPollIntervalMs int `json:",omitempty"`
// Turbo read retry interval in milliseconds when data is expected but not yet available.
TurboReadRetryIntervalMs int `json:",omitempty"`
+ // Maximum time to wait for turbo EOF acknowledgement after signaling EOF, in milliseconds.
+ TurboEOFAckTimeoutMs int `json:",omitempty"`
// Wait for turbo aggregate serialization during shutdown in milliseconds.
ShutdownTurboSerializeWaitMs int `json:",omitempty"`
// Final idle recheck wait before shutdown in milliseconds.
@@ -128,6 +130,7 @@ func newDefaultServerConfig() *ServerConfig {
TurboFlushTimeoutMs: 2000,
TurboFlushPollIntervalMs: 10,
TurboReadRetryIntervalMs: 1,
+ TurboEOFAckTimeoutMs: 2000,
ShutdownTurboSerializeWaitMs: 500,
ShutdownIdleRecheckWaitMs: 10,
}
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index a61da48..f21262e 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -366,3 +366,8 @@ func (h *baseHandler) GetTurboChannel() chan []byte {
func (h *baseHandler) TurboChannelLen() int {
return h.turbo.channelLen()
}
+
+// WaitForTurboEOFAck waits until turbo reader acknowledges EOF or timeout.
+func (h *baseHandler) WaitForTurboEOFAck(timeout time.Duration) bool {
+ return h.turbo.waitForEOFAck(timeout)
+}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index ad2b87f..6078e37 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -142,13 +142,19 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
// Signal EOF by closing the channel, but only once.
r.server.SignalTurboEOF()
- // Wait to ensure all data is transmitted
- // This is especially important when files are queued due to concurrency limits
- // In serverless mode, data is written directly to stdout, so no wait is needed
+ // Wait for an explicit reader acknowledgement instead of timing guesses.
if !r.server.Serverless() {
- waitTime := r.server.TurboEOFWaitDuration(len(paths))
- dlog.Server.Debug(r.server.LogContext(), "Waiting for data transmission", "duration", waitTime)
- time.Sleep(waitTime)
+ timeout := r.server.TurboEOFAckTimeout()
+ if r.server.WaitForTurboEOFAck(timeout) {
+ dlog.Server.Debug(r.server.LogContext(), "Turbo EOF acknowledged")
+ } else {
+ dlog.Server.Warn(
+ r.server.LogContext(),
+ "Timeout waiting for turbo EOF acknowledgement",
+ "timeout", timeout,
+ "remainingTurbo", r.server.TurboChannelLen(),
+ )
+ }
}
}
}
diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go
index 46f61a4..2c0a616 100644
--- a/internal/server/handlers/readcommand_server.go
+++ b/internal/server/handlers/readcommand_server.go
@@ -33,6 +33,8 @@ type readCommandServer interface {
FlushTurboData()
SignalTurboEOF()
GetTurboChannel() chan []byte
+ TurboChannelLen() int
+ WaitForTurboEOFAck(timeout time.Duration) bool
ReadGlobRetryInterval() time.Duration
ReadRetryInterval() time.Duration
AggregateLinesChannelBufferSize() int
@@ -40,6 +42,7 @@ type readCommandServer interface {
TurboEOFWaitDuration(fileCount int) time.Duration
ShutdownTurboSerializeWait() time.Duration
ShutdownIdleRecheckWait() time.Duration
+ TurboEOFAckTimeout() time.Duration
}
var _ readCommandServer = (*ServerHandler)(nil)
@@ -126,6 +129,10 @@ func (h *ServerHandler) FlushTurboData() {
h.flushTurboData()
}
+func (h *ServerHandler) TurboEOFAckTimeout() time.Duration {
+ return durationFromMilliseconds(h.serverCfg.TurboEOFAckTimeoutMs, 2*time.Second)
+}
+
func durationFromMilliseconds(value int, fallback time.Duration) time.Duration {
if value <= 0 {
return fallback
diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go
index 9d36359..e5cbf1a 100644
--- a/internal/server/handlers/turbo_manager.go
+++ b/internal/server/handlers/turbo_manager.go
@@ -26,6 +26,7 @@ type turboManager struct {
lines chan []byte
buffer []byte
eof chan struct{}
+ eofAck chan struct{}
channelBufferSize int
flushTimeout time.Duration
@@ -83,6 +84,7 @@ func (t *turboManager) enable() {
}
// Always create a new EOF channel for each batch of files.
t.eof = make(chan struct{})
+ t.eofAck = make(chan struct{})
}
func (t *turboManager) enabled() bool {
@@ -106,6 +108,40 @@ func (t *turboManager) signalEOF() {
}
}
+func (t *turboManager) signalEOFAck() {
+ if t.eofAck == nil {
+ return
+ }
+
+ select {
+ case <-t.eofAck:
+ // Already closed.
+ default:
+ close(t.eofAck)
+ }
+}
+
+func (t *turboManager) waitForEOFAck(timeout time.Duration) bool {
+ if t.eofAck == nil {
+ return true
+ }
+
+ if timeout <= 0 {
+ <-t.eofAck
+ return true
+ }
+
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+
+ select {
+ case <-t.eofAck:
+ return true
+ case <-timer.C:
+ return false
+ }
+}
+
func (t *turboManager) channel() chan []byte {
return t.lines
}
@@ -194,6 +230,7 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool)
case <-t.eof:
dlog.Server.Trace(user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode")
t.mode = false
+ t.signalEOFAck()
default:
}
}