summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 10:13:50 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 10:13:50 +0200
commit3389e64c2fc2d7bdafb8d1d48118bdaae94a8190 (patch)
tree2fdc199472c673c798f764c246067344f228c83b /internal
parentd32b03d3bf562dcc6b3b83055c9aac6fb852fd14 (diff)
refactor: split turbo read processor construction in readcommand
Diffstat (limited to 'internal')
-rw-r--r--internal/server/handlers/readcommand.go76
1 files changed, 39 insertions, 37 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 6399e91..9ad7a6b 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -27,6 +27,12 @@ type readCommand struct {
type readStrategy func(context.Context, lcontext.LContext, fs.FileReader, regex.Regex) error
+type turboReadProcessor interface {
+ ProcessLine(*bytes.Buffer, uint64, string) error
+ Flush() error
+ Close() error
+}
+
func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
return &readCommand{
server: server,
@@ -248,29 +254,8 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L
dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID)
r.logRegexMode(re)
- // Enable turbo mode if not already enabled
- if !r.server.IsTurboMode() {
- r.server.EnableTurboMode()
- }
-
- // Create a direct writer based on the mode
- // Each file gets its own writer instance to avoid race conditions
- // when multiple files are processed concurrently
- var writer TurboWriter
- if r.server.serverless {
- // In serverless mode, write directly to stdout
- writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless)
- } else {
- // In server mode, use the network writer with turbo channels
- // Create a new instance for each file to ensure thread safety
- writer = &TurboNetworkWriter{
- turboLines: r.server.GetTurboChannel(),
- serverMessages: r.server.serverMessages,
- hostname: r.server.hostname,
- plain: r.server.plain,
- serverless: r.server.serverless,
- }
- }
+ r.ensureTurboModeEnabled()
+ writer := r.makeTurboWriter()
r.executeReadLoop(ctx, ltx, path, globID, re, reader, r.readViaTurboProcessor(path, globID, writer))
}
@@ -325,20 +310,7 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri
return func(ctx context.Context, ltx lcontext.LContext, reader fs.FileReader, re regex.Regex) error {
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration")
- var processor interface {
- ProcessLine(*bytes.Buffer, uint64, string) error
- Flush() error
- Close() error
- }
-
- if r.server.turboAggregate != nil {
- // Use turbo aggregate processor for MapReduce operations.
- dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID)
- processor = server.NewTurboAggregateProcessor(r.server.turboAggregate, globID)
- } else {
- // Use direct line processor for cat/grep/tail.
- processor = NewDirectLineProcessor(writer, globID)
- }
+ processor := r.makeTurboProcessor(path, globID, writer)
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start")
startErr := reader.StartWithProcessorOptimized(ctx, ltx, processor, re)
@@ -367,6 +339,36 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri
}
}
+func (r *readCommand) ensureTurboModeEnabled() {
+ if !r.server.IsTurboMode() {
+ r.server.EnableTurboMode()
+ }
+}
+
+func (r *readCommand) makeTurboWriter() TurboWriter {
+ // Create a writer instance per file to keep concurrent processing isolated.
+ if r.server.serverless {
+ return NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless)
+ }
+
+ return &TurboNetworkWriter{
+ turboLines: r.server.GetTurboChannel(),
+ serverMessages: r.server.serverMessages,
+ hostname: r.server.hostname,
+ plain: r.server.plain,
+ serverless: r.server.serverless,
+ }
+}
+
+func (r *readCommand) makeTurboProcessor(path, globID string, writer TurboWriter) turboReadProcessor {
+ if r.server.turboAggregate != nil {
+ dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID)
+ return server.NewTurboAggregateProcessor(r.server.turboAggregate, globID)
+ }
+
+ return NewDirectLineProcessor(writer, globID)
+}
+
func (r *readCommand) logRegexMode(re regex.Regex) {
if r.mode != omode.GrepClient {
return