diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 09:50:13 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 09:50:13 +0200 |
| commit | 7fee96ccd4328eb619a9c586802a2caba68c12fc (patch) | |
| tree | 130e393a11157de485700d34b4c4d01e8604f62c /internal | |
| parent | 5e8d0454d1aa5388a7045bdcb3069e41e4474957 (diff) | |
refactor(handlers): decouple turbo network writer from base handler
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 9 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_writer.go | 36 |
2 files changed, 21 insertions, 24 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 86ae708..6399e91 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -264,10 +264,11 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L // In server mode, use the network writer with turbo channels // Create a new instance for each file to ensure thread safety writer = &TurboNetworkWriter{ - handler: &r.server.baseHandler, - hostname: r.server.hostname, - plain: r.server.plain, - serverless: r.server.serverless, + turboLines: r.server.GetTurboChannel(), + serverMessages: r.server.serverMessages, + hostname: r.server.hostname, + plain: r.server.plain, + serverless: r.server.serverless, } } diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go index 37b40d0..c687edf 100644 --- a/internal/server/handlers/turbo_writer.go +++ b/internal/server/handlers/turbo_writer.go @@ -309,19 +309,17 @@ func (w *TurboChannelWriter) Stats() (linesWritten, bytesWritten uint64) { // TurboNetworkWriter writes directly to the network connection bypassing channels type TurboNetworkWriter struct { - handler *baseHandler - hostname string - plain bool - serverless bool + turboLines chan<- []byte + serverMessages chan<- string + hostname string + plain bool + serverless bool // Internal buffer for batching writes writeBuf bytes.Buffer bufSize int mutex sync.Mutex - // Direct output channel for turbo mode - outputChan chan []byte - // Stats linesWritten uint64 bytesWritten uint64 @@ -354,8 +352,7 @@ func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, s // sendToTurboChannel sends buffered data to the turbo channel with retry logic. // Handles channel backpressure by waiting and retrying. Must be called with mutex held. func (w *TurboNetworkWriter) sendToTurboChannel() error { - turboCh := w.handler.GetTurboChannel() - if turboCh == nil { + if w.turboLines == nil { dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "turboLines channel is nil") w.writeBuf.Reset() return nil @@ -368,7 +365,7 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error { // Send data to turbo channel, retry once if full select { - case turboCh <- data: + case w.turboLines <- data: dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully") w.writeBuf.Reset() return nil @@ -376,7 +373,7 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error { // Channel full, wait a bit and retry dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry") time.Sleep(time.Millisecond) - turboCh <- data + w.turboLines <- data dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel after retry") w.writeBuf.Reset() return nil @@ -387,9 +384,9 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error { func (w *TurboNetworkWriter) WriteServerMessage(message string) error { // Server messages are less critical in turbo mode // We can send them through the normal channel - if w.handler != nil && w.handler.serverMessages != nil { + if w.serverMessages != nil { select { - case w.handler.serverMessages <- message: + case w.serverMessages <- message: return nil default: return fmt.Errorf("server message channel full") @@ -409,13 +406,12 @@ func (w *TurboNetworkWriter) Flush() error { if w.writeBuf.Len() > 0 { dlog.Server.Trace("TurboNetworkWriter.Flush", "flushing buffered data", "bufSize", w.writeBuf.Len()) - turboCh := w.handler.GetTurboChannel() - if turboCh != nil { + if w.turboLines != nil { data := make([]byte, w.writeBuf.Len()) copy(data, w.writeBuf.Bytes()) // Force send the data - turboCh <- data + w.turboLines <- data w.writeBuf.Reset() dlog.Server.Trace("TurboNetworkWriter.Flush", "flushed data to channel") } @@ -423,13 +419,13 @@ func (w *TurboNetworkWriter) Flush() error { // Wait for the channel to have some space to ensure data is being processed // Don't close the EOF channel here as it may be used for multiple files - if w.handler.GetTurboChannel() != nil { + if w.turboLines != nil { // Wait until channel has been drained somewhat - for i := 0; i < 100 && w.handler.TurboChannelLen() > 900; i++ { - dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", w.handler.TurboChannelLen()) + for i := 0; i < 100 && len(w.turboLines) > 900; i++ { + dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", len(w.turboLines)) time.Sleep(10 * time.Millisecond) } - dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", w.handler.TurboChannelLen()) + dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", len(w.turboLines)) } // Wait a bit to ensure data is processed |
