summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-30 23:40:38 +0300
committerPaul Buetow <paul@buetow.org>2025-06-30 23:40:38 +0300
commit7a917e6e81bf8e956eff2a4a54e9300ab2747949 (patch)
treeac55c6accd1cca2df40529db2cdd26094d4f5ee0 /internal/server
parentb4ca43d97c83c3b9da7138b3b4d6f6cce6fed370 (diff)
fix: resolve channel close panic and improve turbo mode synchronization
- Remove problematic close(turboEOF) call from TurboNetworkWriter.Flush() that was causing "close of closed channel" panic when processing multiple files - Add proper EOF signaling in readFiles() after all files are processed - Always create new turboEOF channel for each batch to ensure clean state - Increase flush timeout iterations for turbo mode to handle large file batches - Add wait time after EOF signal to ensure data transmission completes This fixes the panic that occurred in TestDCat2 when processing the same file multiple times, where the TurboNetworkWriter instance was reused and attempted to close the same channel multiple times. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/basehandler.go17
-rw-r--r--internal/server/handlers/readcommand.go18
-rw-r--r--internal/server/handlers/turbo_writer.go16
3 files changed, 35 insertions, 16 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index b756201..a82c91a 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -366,12 +366,19 @@ func (h *baseHandler) flush() {
dlog.Server.Trace(h.user, "flush", "lines", lineCount, "server", serverCount, "mapr", maprCount, "turbo", turboCount)
return lineCount + serverCount + maprCount + turboCount
}
- for i := 0; i < 100; i++ { // Increase iterations for turbo mode
+
+ // Increase iterations for turbo mode to handle large file batches
+ maxIterations := 100
+ if h.turboMode {
+ maxIterations = 300 // Give more time for turbo mode to drain
+ }
+
+ for i := 0; i < maxIterations; i++ {
if numUnsentMessages() == 0 {
dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h))
return
}
- dlog.Server.Debug(h.user, "Still lines to be sent")
+ dlog.Server.Debug(h.user, "Still lines to be sent", "iteration", i, "unsent", numUnsentMessages())
time.Sleep(time.Millisecond * 10)
}
dlog.Server.Warn(h.user, "Some lines remain unsent", numUnsentMessages())
@@ -412,9 +419,9 @@ func (h *baseHandler) EnableTurboMode() {
if h.turboLines == nil {
h.turboLines = make(chan []byte, 1000) // Large buffer for performance
}
- if h.turboEOF == nil {
- h.turboEOF = make(chan struct{})
- }
+ // Always create a new turboEOF channel for each batch of files
+ // This ensures proper synchronization when processing multiple file batches
+ h.turboEOF = make(chan struct{})
}
// IsTurboMode returns true if turbo mode is enabled
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index dc11aa9..91bd7c3 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -110,6 +110,24 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
}
wg.Wait()
+ // In turbo mode, signal EOF after all files are processed
+ // This is crucial for proper shutdown in server mode
+ turboBoostEnabled := config.Env("DTAIL_TURBOBOOST_ENABLE")
+ if turboBoostEnabled && r.server.aggregate == nil &&
+ (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
+ if r.server.IsTurboMode() && r.server.turboEOF != nil {
+ // Signal EOF by closing the channel, but only if it hasn't been closed yet
+ select {
+ case <-r.server.turboEOF:
+ // Already closed
+ default:
+ close(r.server.turboEOF)
+ }
+ // Wait longer to ensure all data is transmitted for large batches
+ time.Sleep(500 * time.Millisecond)
+ }
+ }
+
// In turbo mode with aggregate, we don't close the shared channel here
// because it will be used across multiple invocations
// The aggregate will handle channel closure when it's done
diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go
index 01755a5..96ad964 100644
--- a/internal/server/handlers/turbo_writer.go
+++ b/internal/server/handlers/turbo_writer.go
@@ -348,9 +348,6 @@ type TurboNetworkWriter struct {
// Stats
linesWritten uint64
bytesWritten uint64
-
- // Track if we've signaled EOF
- eofSignaled bool
}
// WriteLineData formats and writes line data directly
@@ -431,7 +428,7 @@ func (w *TurboNetworkWriter) WriteServerMessage(message string) error {
// Flush ensures all data is written
func (w *TurboNetworkWriter) Flush() error {
- dlog.Server.Trace("TurboNetworkWriter.Flush", "called", "eofSignaled", w.eofSignaled)
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "called")
w.mutex.Lock()
defer w.mutex.Unlock()
@@ -451,18 +448,15 @@ func (w *TurboNetworkWriter) Flush() error {
}
}
- // Wait for the channel to have space before signaling EOF
- // This ensures data has been sent
- if !w.eofSignaled && w.handler.turboEOF != nil {
+ // Wait for the channel to have some space to ensure data is being processed
+ // Don't close the EOF channel here as it may be used for multiple files
+ if w.handler.turboLines != nil {
// Wait until channel has been drained somewhat
for i := 0; i < 100 && len(w.handler.turboLines) > 900; i++ {
dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", len(w.handler.turboLines))
time.Sleep(10 * time.Millisecond)
}
-
- dlog.Server.Trace("TurboNetworkWriter.Flush", "signaling EOF", "channelLen", len(w.handler.turboLines))
- close(w.handler.turboEOF)
- w.eofSignaled = true
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", len(w.handler.turboLines))
}
// Wait a bit to ensure data is processed