diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 10:13:50 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 10:13:50 +0200 |
| commit | 3389e64c2fc2d7bdafb8d1d48118bdaae94a8190 (patch) | |
| tree | 2fdc199472c673c798f764c246067344f228c83b /internal | |
| parent | d32b03d3bf562dcc6b3b83055c9aac6fb852fd14 (diff) | |
refactor: split turbo read processor construction in readcommand
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 76 |
1 files changed, 39 insertions, 37 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 6399e91..9ad7a6b 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -27,6 +27,12 @@ type readCommand struct { type readStrategy func(context.Context, lcontext.LContext, fs.FileReader, regex.Regex) error +type turboReadProcessor interface { + ProcessLine(*bytes.Buffer, uint64, string) error + Flush() error + Close() error +} + func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { return &readCommand{ server: server, @@ -248,29 +254,8 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID) r.logRegexMode(re) - // Enable turbo mode if not already enabled - if !r.server.IsTurboMode() { - r.server.EnableTurboMode() - } - - // Create a direct writer based on the mode - // Each file gets its own writer instance to avoid race conditions - // when multiple files are processed concurrently - var writer TurboWriter - if r.server.serverless { - // In serverless mode, write directly to stdout - writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless) - } else { - // In server mode, use the network writer with turbo channels - // Create a new instance for each file to ensure thread safety - writer = &TurboNetworkWriter{ - turboLines: r.server.GetTurboChannel(), - serverMessages: r.server.serverMessages, - hostname: r.server.hostname, - plain: r.server.plain, - serverless: r.server.serverless, - } - } + r.ensureTurboModeEnabled() + writer := r.makeTurboWriter() r.executeReadLoop(ctx, ltx, path, globID, re, reader, r.readViaTurboProcessor(path, globID, writer)) } @@ -325,20 +310,7 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri return func(ctx context.Context, ltx lcontext.LContext, reader fs.FileReader, re regex.Regex) error { dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration") - var processor interface { - ProcessLine(*bytes.Buffer, uint64, string) error - Flush() error - Close() error - } - - if r.server.turboAggregate != nil { - // Use turbo aggregate processor for MapReduce operations. - dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID) - processor = server.NewTurboAggregateProcessor(r.server.turboAggregate, globID) - } else { - // Use direct line processor for cat/grep/tail. - processor = NewDirectLineProcessor(writer, globID) - } + processor := r.makeTurboProcessor(path, globID, writer) dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start") startErr := reader.StartWithProcessorOptimized(ctx, ltx, processor, re) @@ -367,6 +339,36 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri } } +func (r *readCommand) ensureTurboModeEnabled() { + if !r.server.IsTurboMode() { + r.server.EnableTurboMode() + } +} + +func (r *readCommand) makeTurboWriter() TurboWriter { + // Create a writer instance per file to keep concurrent processing isolated. + if r.server.serverless { + return NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless) + } + + return &TurboNetworkWriter{ + turboLines: r.server.GetTurboChannel(), + serverMessages: r.server.serverMessages, + hostname: r.server.hostname, + plain: r.server.plain, + serverless: r.server.serverless, + } +} + +func (r *readCommand) makeTurboProcessor(path, globID string, writer TurboWriter) turboReadProcessor { + if r.server.turboAggregate != nil { + dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID) + return server.NewTurboAggregateProcessor(r.server.turboAggregate, globID) + } + + return NewDirectLineProcessor(writer, globID) +} + func (r *readCommand) logRegexMode(re regex.Regex) { if r.mode != omode.GrepClient { return |
