summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 09:50:13 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 09:50:13 +0200
commit7fee96ccd4328eb619a9c586802a2caba68c12fc (patch)
tree130e393a11157de485700d34b4c4d01e8604f62c /internal
parent5e8d0454d1aa5388a7045bdcb3069e41e4474957 (diff)
refactor(handlers): decouple turbo network writer from base handler
Diffstat (limited to 'internal')
-rw-r--r--internal/server/handlers/readcommand.go9
-rw-r--r--internal/server/handlers/turbo_writer.go36
2 files changed, 21 insertions, 24 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 86ae708..6399e91 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -264,10 +264,11 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L
// In server mode, use the network writer with turbo channels
// Create a new instance for each file to ensure thread safety
writer = &TurboNetworkWriter{
- handler: &r.server.baseHandler,
- hostname: r.server.hostname,
- plain: r.server.plain,
- serverless: r.server.serverless,
+ turboLines: r.server.GetTurboChannel(),
+ serverMessages: r.server.serverMessages,
+ hostname: r.server.hostname,
+ plain: r.server.plain,
+ serverless: r.server.serverless,
}
}
diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go
index 37b40d0..c687edf 100644
--- a/internal/server/handlers/turbo_writer.go
+++ b/internal/server/handlers/turbo_writer.go
@@ -309,19 +309,17 @@ func (w *TurboChannelWriter) Stats() (linesWritten, bytesWritten uint64) {
// TurboNetworkWriter writes directly to the network connection bypassing channels
type TurboNetworkWriter struct {
- handler *baseHandler
- hostname string
- plain bool
- serverless bool
+ turboLines chan<- []byte
+ serverMessages chan<- string
+ hostname string
+ plain bool
+ serverless bool
// Internal buffer for batching writes
writeBuf bytes.Buffer
bufSize int
mutex sync.Mutex
- // Direct output channel for turbo mode
- outputChan chan []byte
-
// Stats
linesWritten uint64
bytesWritten uint64
@@ -354,8 +352,7 @@ func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, s
// sendToTurboChannel sends buffered data to the turbo channel with retry logic.
// Handles channel backpressure by waiting and retrying. Must be called with mutex held.
func (w *TurboNetworkWriter) sendToTurboChannel() error {
- turboCh := w.handler.GetTurboChannel()
- if turboCh == nil {
+ if w.turboLines == nil {
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "turboLines channel is nil")
w.writeBuf.Reset()
return nil
@@ -368,7 +365,7 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error {
// Send data to turbo channel, retry once if full
select {
- case turboCh <- data:
+ case w.turboLines <- data:
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully")
w.writeBuf.Reset()
return nil
@@ -376,7 +373,7 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error {
// Channel full, wait a bit and retry
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry")
time.Sleep(time.Millisecond)
- turboCh <- data
+ w.turboLines <- data
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel after retry")
w.writeBuf.Reset()
return nil
@@ -387,9 +384,9 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error {
func (w *TurboNetworkWriter) WriteServerMessage(message string) error {
// Server messages are less critical in turbo mode
// We can send them through the normal channel
- if w.handler != nil && w.handler.serverMessages != nil {
+ if w.serverMessages != nil {
select {
- case w.handler.serverMessages <- message:
+ case w.serverMessages <- message:
return nil
default:
return fmt.Errorf("server message channel full")
@@ -409,13 +406,12 @@ func (w *TurboNetworkWriter) Flush() error {
if w.writeBuf.Len() > 0 {
dlog.Server.Trace("TurboNetworkWriter.Flush", "flushing buffered data", "bufSize", w.writeBuf.Len())
- turboCh := w.handler.GetTurboChannel()
- if turboCh != nil {
+ if w.turboLines != nil {
data := make([]byte, w.writeBuf.Len())
copy(data, w.writeBuf.Bytes())
// Force send the data
- turboCh <- data
+ w.turboLines <- data
w.writeBuf.Reset()
dlog.Server.Trace("TurboNetworkWriter.Flush", "flushed data to channel")
}
@@ -423,13 +419,13 @@ func (w *TurboNetworkWriter) Flush() error {
// Wait for the channel to have some space to ensure data is being processed
// Don't close the EOF channel here as it may be used for multiple files
- if w.handler.GetTurboChannel() != nil {
+ if w.turboLines != nil {
// Wait until channel has been drained somewhat
- for i := 0; i < 100 && w.handler.TurboChannelLen() > 900; i++ {
- dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", w.handler.TurboChannelLen())
+ for i := 0; i < 100 && len(w.turboLines) > 900; i++ {
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", len(w.turboLines))
time.Sleep(10 * time.Millisecond)
}
- dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", w.handler.TurboChannelLen())
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", len(w.turboLines))
}
// Wait a bit to ensure data is processed