diff options
| author | Paul Buetow <paul@buetow.org> | 2025-06-30 23:40:38 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-06-30 23:40:38 +0300 |
| commit | 7a917e6e81bf8e956eff2a4a54e9300ab2747949 (patch) | |
| tree | ac55c6accd1cca2df40529db2cdd26094d4f5ee0 /internal/server/handlers | |
| parent | b4ca43d97c83c3b9da7138b3b4d6f6cce6fed370 (diff) | |
fix: resolve channel close panic and improve turbo mode synchronization
- Remove problematic close(turboEOF) call from TurboNetworkWriter.Flush()
that was causing "close of closed channel" panic when processing multiple files
- Add proper EOF signaling in readFiles() after all files are processed
- Always create new turboEOF channel for each batch to ensure clean state
- Increase flush timeout iterations for turbo mode to handle large file batches
- Add wait time after EOF signal to ensure data transmission completes
This fixes the panic that occurred in TestDCat2 when processing the same
file multiple times, where the TurboNetworkWriter instance was reused
and attempted to close the same channel multiple times.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers')
| -rw-r--r-- | internal/server/handlers/basehandler.go | 17 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 18 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_writer.go | 16 |
3 files changed, 35 insertions, 16 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index b756201..a82c91a 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -366,12 +366,19 @@ func (h *baseHandler) flush() { dlog.Server.Trace(h.user, "flush", "lines", lineCount, "server", serverCount, "mapr", maprCount, "turbo", turboCount) return lineCount + serverCount + maprCount + turboCount } - for i := 0; i < 100; i++ { // Increase iterations for turbo mode + + // Increase iterations for turbo mode to handle large file batches + maxIterations := 100 + if h.turboMode { + maxIterations = 300 // Give more time for turbo mode to drain + } + + for i := 0; i < maxIterations; i++ { if numUnsentMessages() == 0 { dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h)) return } - dlog.Server.Debug(h.user, "Still lines to be sent") + dlog.Server.Debug(h.user, "Still lines to be sent", "iteration", i, "unsent", numUnsentMessages()) time.Sleep(time.Millisecond * 10) } dlog.Server.Warn(h.user, "Some lines remain unsent", numUnsentMessages()) @@ -412,9 +419,9 @@ func (h *baseHandler) EnableTurboMode() { if h.turboLines == nil { h.turboLines = make(chan []byte, 1000) // Large buffer for performance } - if h.turboEOF == nil { - h.turboEOF = make(chan struct{}) - } + // Always create a new turboEOF channel for each batch of files + // This ensures proper synchronization when processing multiple file batches + h.turboEOF = make(chan struct{}) } // IsTurboMode returns true if turbo mode is enabled diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index dc11aa9..91bd7c3 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -110,6 +110,24 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, } wg.Wait() + // In turbo mode, signal EOF after all files are processed + // This is crucial for proper shutdown in server mode + turboBoostEnabled := config.Env("DTAIL_TURBOBOOST_ENABLE") + if turboBoostEnabled && r.server.aggregate == nil && + (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) { + if r.server.IsTurboMode() && r.server.turboEOF != nil { + // Signal EOF by closing the channel, but only if it hasn't been closed yet + select { + case <-r.server.turboEOF: + // Already closed + default: + close(r.server.turboEOF) + } + // Wait longer to ensure all data is transmitted for large batches + time.Sleep(500 * time.Millisecond) + } + } + // In turbo mode with aggregate, we don't close the shared channel here // because it will be used across multiple invocations // The aggregate will handle channel closure when it's done diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go index 01755a5..96ad964 100644 --- a/internal/server/handlers/turbo_writer.go +++ b/internal/server/handlers/turbo_writer.go @@ -348,9 +348,6 @@ type TurboNetworkWriter struct { // Stats linesWritten uint64 bytesWritten uint64 - - // Track if we've signaled EOF - eofSignaled bool } // WriteLineData formats and writes line data directly @@ -431,7 +428,7 @@ func (w *TurboNetworkWriter) WriteServerMessage(message string) error { // Flush ensures all data is written func (w *TurboNetworkWriter) Flush() error { - dlog.Server.Trace("TurboNetworkWriter.Flush", "called", "eofSignaled", w.eofSignaled) + dlog.Server.Trace("TurboNetworkWriter.Flush", "called") w.mutex.Lock() defer w.mutex.Unlock() @@ -451,18 +448,15 @@ func (w *TurboNetworkWriter) Flush() error { } } - // Wait for the channel to have space before signaling EOF - // This ensures data has been sent - if !w.eofSignaled && w.handler.turboEOF != nil { + // Wait for the channel to have some space to ensure data is being processed + // Don't close the EOF channel here as it may be used for multiple files + if w.handler.turboLines != nil { // Wait until channel has been drained somewhat for i := 0; i < 100 && len(w.handler.turboLines) > 900; i++ { dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", len(w.handler.turboLines)) time.Sleep(10 * time.Millisecond) } - - dlog.Server.Trace("TurboNetworkWriter.Flush", "signaling EOF", "channelLen", len(w.handler.turboLines)) - close(w.handler.turboEOF) - w.eofSignaled = true + dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", len(w.handler.turboLines)) } // Wait a bit to ensure data is processed |
