summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 09:02:34 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 09:02:34 +0200
commit174bd919ab58e15a1841df428025ea9cc8ef7e3a (patch)
tree80264b611389cfca486384887c9324eaac34e98e /internal
parent50a40f6e77e9f9a6f65e0596c789f67b91f6a6e1 (diff)
Extract protocol and turbo responsibilities from baseHandler (task 327)
Diffstat (limited to 'internal')
-rw-r--r--internal/server/handlers/basehandler.go196
-rw-r--r--internal/server/handlers/healthhandler.go1
-rw-r--r--internal/server/handlers/protocol_codec.go72
-rw-r--r--internal/server/handlers/readcommand.go11
-rw-r--r--internal/server/handlers/serverhandler.go1
-rw-r--r--internal/server/handlers/turbo_manager.go142
-rw-r--r--internal/server/handlers/turbo_writer.go38
7 files changed, 276 insertions, 185 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 3bb824b..5e9f1ee 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -3,11 +3,8 @@ package handlers
import (
"bytes"
"context"
- "encoding/base64"
- "errors"
"fmt"
"io"
- "strconv"
"strings"
"sync"
"sync/atomic"
@@ -31,13 +28,14 @@ type baseHandler struct {
handleCommandCb handleCommandCb
lines chan *line.Line
aggregate *server.Aggregate
- turboAggregate *server.TurboAggregate // Turbo mode aggregate
+ turboAggregate *server.TurboAggregate // Turbo mode aggregate
maprMessages chan string
serverMessages chan string
hostname string
user *user.User
ackCloseReceived chan struct{}
activeCommands int32
+ codec protocolCodec
readBuf bytes.Buffer
writeBuf bytes.Buffer
@@ -47,12 +45,8 @@ type baseHandler struct {
quiet bool
plain bool
serverless bool
-
- // Turbo mode support
- turboMode bool
- turboLines chan []byte // Pre-formatted lines for turbo mode
- turboBuffer []byte // Buffer for partially sent turbo data
- turboEOF chan struct{} // Signal when turbo data is complete
+
+ turbo turboManager
}
// Shutdown the handler.
@@ -79,66 +73,8 @@ func (h *baseHandler) Done() <-chan struct{} {
func (h *baseHandler) Read(p []byte) (n int, err error) {
defer h.readBuf.Reset()
- // In turbo mode, check if we have buffered data first
- if h.turboMode && len(h.turboBuffer) > 0 {
- dlog.Server.Trace(h.user, "baseHandler.Read", "using buffered turbo data", "bufferedLen", len(h.turboBuffer))
- n = copy(p, h.turboBuffer)
- h.turboBuffer = h.turboBuffer[n:]
- dlog.Server.Trace(h.user, "baseHandler.Read", "after buffer read", "copied", n, "remaining", len(h.turboBuffer))
- return
- }
-
- // In turbo mode, prioritize pre-formatted turbo lines
- if h.turboMode && h.turboLines != nil {
- channelLen := len(h.turboLines)
- dlog.Server.Trace(h.user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen)
-
- // Try to read from the channel
- select {
- case turboData := <-h.turboLines:
- dlog.Server.Trace(h.user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData))
- n = copy(p, turboData)
- // If we couldn't send all data, buffer the rest
- if n < len(turboData) {
- h.turboBuffer = turboData[n:]
- dlog.Server.Trace(h.user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(h.turboBuffer))
- }
- return
- default:
- // No data immediately available
- if channelLen > 0 {
- // There's data in the channel but we couldn't get it immediately
- // Wait a bit and try again
- dlog.Server.Trace(h.user, "baseHandler.Read", "channel has data but not available, waiting")
- time.Sleep(time.Millisecond)
- select {
- case turboData := <-h.turboLines:
- dlog.Server.Trace(h.user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData))
- n = copy(p, turboData)
- if n < len(turboData) {
- h.turboBuffer = turboData[n:]
- }
- return
- default:
- // Still no data
- }
- }
-
- // Channel is truly empty, check if we should continue in turbo mode
- // Only disable turbo mode if we've been signaled to do so
- if h.turboEOF != nil {
- select {
- case <-h.turboEOF:
- dlog.Server.Trace(h.user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode")
- h.turboMode = false
- default:
- // EOF not signaled yet, continue in turbo mode
- }
- }
-
- dlog.Server.Trace(h.user, "baseHandler.Read", "no data in turboLines, falling through")
- // Fall through to normal processing
- }
+ if n, handled := h.turbo.tryRead(p, h.user); handled {
+ return n, nil
}
select {
@@ -264,54 +200,11 @@ func (h *baseHandler) handleCommand(commandStr string) {
}
func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) {
- argc := len(args)
- var add string
-
- if argc <= 2 || args[0] != "protocol" {
- return args, argc, add, errors.New("unable to determine protocol version")
- }
-
- if args[1] != protocol.ProtocolCompat {
- clientCompat, _ := strconv.Atoi(args[1])
- serverCompat, _ := strconv.Atoi(protocol.ProtocolCompat)
- if clientCompat <= 3 {
- // Protocol version 3 or lower expect a newline as message separator
- // One day (after 2 major versions) this exception may be removed!
- add = "\n"
- }
-
- toUpdate := "client"
- if clientCompat > serverCompat {
- toUpdate = "server"
- }
- err := fmt.Errorf("the DTail server protocol version '%s' does not match "+
- "client protocol version '%s', please update DTail %s",
- protocol.ProtocolCompat, args[1], toUpdate)
- return args, argc, add, err
- }
-
- return args[2:], argc - 2, add, nil
+ return h.codec.handleProtocolVersion(args)
}
func (h *baseHandler) handleBase64(args []string, argc int) ([]string, int, error) {
- err := errors.New("unable to decode client message, DTail server and client " +
- "versions may not be compatible")
- if argc != 2 || args[0] != "base64" {
- return args, argc, err
- }
-
- decoded, err := base64.StdEncoding.DecodeString(args[1])
- if err != nil {
- return args, argc, err
- }
- decodedStr := string(decoded)
-
- args = strings.Split(decodedStr, " ")
- argc = len(args)
- dlog.Server.Trace(h.user, "Base64 decoded received command",
- decodedStr, argc, args)
-
- return args, argc, nil
+ return h.codec.handleBase64(args, argc)
}
func (h *baseHandler) handleAckCommand(argc int, args []string) {
@@ -370,24 +263,21 @@ func (h *baseHandler) flush() {
lineCount := len(h.lines)
serverCount := len(h.serverMessages)
maprCount := len(h.maprMessages)
- turboCount := 0
- if h.turboLines != nil {
- turboCount = len(h.turboLines)
- }
+ turboCount := h.turbo.channelLen()
dlog.Server.Trace(h.user, "flush", "lines", lineCount, "server", serverCount, "mapr", maprCount, "turbo", turboCount)
return lineCount + serverCount + maprCount + turboCount
}
-
+
// Increase iterations for turbo mode to handle large file batches
maxIterations := 100
- if h.turboMode {
+ if h.turbo.enabled() {
maxIterations = 300 // Give more time for turbo mode to drain
}
// Also increase iterations if we have MapReduce messages
if h.turboAggregate != nil || h.aggregate != nil {
maxIterations = 300 // Give more time for MapReduce results
}
-
+
for i := 0; i < maxIterations; i++ {
if numUnsentMessages() == 0 {
dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h))
@@ -402,13 +292,13 @@ func (h *baseHandler) flush() {
func (h *baseHandler) shutdown() {
// Log current state at shutdown
activeCommands := atomic.LoadInt32(&h.activeCommands)
- dlog.Server.Info(h.user, "shutdown() called", "activeCommands", activeCommands, "turboMode", h.turboMode)
-
+ dlog.Server.Info(h.user, "shutdown() called", "activeCommands", activeCommands, "turboMode", h.turbo.enabled())
+
// In turbo mode, ensure all data is flushed before shutdown
- if h.turboMode {
+ if h.turbo.enabled() {
h.flushTurboData()
}
-
+
// Shutdown aggregates BEFORE flush to ensure MapReduce data is available
if h.turboAggregate != nil {
dlog.Server.Info(h.user, "Shutting down turbo aggregate in shutdown()")
@@ -422,7 +312,7 @@ func (h *baseHandler) shutdown() {
// Give time for serialization to complete
time.Sleep(100 * time.Millisecond)
}
-
+
h.flush()
go func() {
@@ -452,47 +342,35 @@ func (h *baseHandler) decrementActiveCommands() int32 {
// EnableTurboMode enables turbo mode for direct line processing
func (h *baseHandler) EnableTurboMode() {
- h.turboMode = true
- if h.turboLines == nil {
- h.turboLines = make(chan []byte, 1000) // Large buffer for performance
- }
- // Always create a new turboEOF channel for each batch of files
- // This ensures proper synchronization when processing multiple file batches
- h.turboEOF = make(chan struct{})
+ h.turbo.enable()
}
// IsTurboMode returns true if turbo mode is enabled
func (h *baseHandler) IsTurboMode() bool {
- return h.turboMode
+ return h.turbo.enabled()
+}
+
+// HasTurboEOF returns true when a turbo EOF channel exists.
+func (h *baseHandler) HasTurboEOF() bool {
+ return h.turbo.hasEOF()
+}
+
+// SignalTurboEOF closes turbo EOF channel once.
+func (h *baseHandler) SignalTurboEOF() {
+ h.turbo.signalEOF()
}
// flushTurboData ensures all turbo channel data is processed
func (h *baseHandler) flushTurboData() {
- if h.turboLines == nil {
- return
- }
-
- dlog.Server.Debug(h.user, "Flushing turbo data", "channelLen", len(h.turboLines))
-
- // Wait for turbo channel to drain with a timeout
- timeout := time.After(2 * time.Second)
- for {
- select {
- case <-timeout:
- dlog.Server.Warn(h.user, "Timeout while flushing turbo data", "remaining", len(h.turboLines))
- return
- default:
- if len(h.turboLines) == 0 {
- dlog.Server.Debug(h.user, "Turbo channel drained successfully")
- return
- }
- // Give the reader time to process
- time.Sleep(10 * time.Millisecond)
- }
- }
+ h.turbo.flush(h.user)
}
// GetTurboChannel returns the turbo lines channel for direct writing
-func (h *baseHandler) GetTurboChannel() chan<- []byte {
- return h.turboLines
+func (h *baseHandler) GetTurboChannel() chan []byte {
+ return h.turbo.channel()
+}
+
+// TurboChannelLen returns current turbo channel buffered size.
+func (h *baseHandler) TurboChannelLen() int {
+ return h.turbo.channelLen()
}
diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go
index 362fe24..5abb3f4 100644
--- a/internal/server/handlers/healthhandler.go
+++ b/internal/server/handlers/healthhandler.go
@@ -28,6 +28,7 @@ func NewHealthHandler(user *user.User) *HealthHandler {
maprMessages: make(chan string, 10),
ackCloseReceived: make(chan struct{}),
user: user,
+ codec: newProtocolCodec(user),
},
}
h.handleCommandCb = h.handleHealthCommand
diff --git a/internal/server/handlers/protocol_codec.go b/internal/server/handlers/protocol_codec.go
new file mode 100644
index 0000000..192cc81
--- /dev/null
+++ b/internal/server/handlers/protocol_codec.go
@@ -0,0 +1,72 @@
+package handlers
+
+import (
+ "encoding/base64"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/protocol"
+ user "github.com/mimecast/dtail/internal/user/server"
+)
+
+type protocolCodec struct {
+ user *user.User
+}
+
+func newProtocolCodec(user *user.User) protocolCodec {
+ return protocolCodec{user: user}
+}
+
+func (c protocolCodec) handleProtocolVersion(args []string) ([]string, int, string, error) {
+ argc := len(args)
+ var add string
+
+ if argc <= 2 || args[0] != "protocol" {
+ return args, argc, add, errors.New("unable to determine protocol version")
+ }
+
+ if args[1] != protocol.ProtocolCompat {
+ clientCompat, _ := strconv.Atoi(args[1])
+ serverCompat, _ := strconv.Atoi(protocol.ProtocolCompat)
+ if clientCompat <= 3 {
+ // Protocol version 3 or lower expect a newline as message separator
+ // One day (after 2 major versions) this exception may be removed!
+ add = "\n"
+ }
+
+ toUpdate := "client"
+ if clientCompat > serverCompat {
+ toUpdate = "server"
+ }
+ err := fmt.Errorf("the DTail server protocol version '%s' does not match "+
+ "client protocol version '%s', please update DTail %s",
+ protocol.ProtocolCompat, args[1], toUpdate)
+ return args, argc, add, err
+ }
+
+ return args[2:], argc - 2, add, nil
+}
+
+func (c protocolCodec) handleBase64(args []string, argc int) ([]string, int, error) {
+ err := errors.New("unable to decode client message, DTail server and client " +
+ "versions may not be compatible")
+ if argc != 2 || args[0] != "base64" {
+ return args, argc, err
+ }
+
+ decoded, err := base64.StdEncoding.DecodeString(args[1])
+ if err != nil {
+ return args, argc, err
+ }
+ decodedStr := string(decoded)
+
+ args = strings.Split(decodedStr, " ")
+ argc = len(args)
+ dlog.Server.Trace(c.user, "Base64 decoded received command",
+ decodedStr, argc, args)
+
+ return args, argc, nil
+}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 0375807..3410499 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -124,19 +124,14 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
// This is crucial for proper shutdown in server mode
if !r.server.serverCfg.TurboBoostDisable && r.server.aggregate == nil &&
(r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
- if r.server.IsTurboMode() && r.server.turboEOF != nil {
+ if r.server.IsTurboMode() && r.server.HasTurboEOF() {
dlog.Server.Debug(r.server.user, "Turbo mode: flushing data before EOF signal")
// Ensure all turbo data is flushed before signaling EOF
r.server.flushTurboData()
- // Signal EOF by closing the channel, but only if it hasn't been closed yet
- select {
- case <-r.server.turboEOF:
- // Already closed
- default:
- close(r.server.turboEOF)
- }
+ // Signal EOF by closing the channel, but only once.
+ r.server.SignalTurboEOF()
// Wait to ensure all data is transmitted
// This is especially important when files are queued due to concurrency limits
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 92619d7..f40081e 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -46,6 +46,7 @@ func NewServerHandler(user *user.User, catLimiter,
maprMessages: make(chan string, 10),
ackCloseReceived: make(chan struct{}),
user: user,
+ codec: newProtocolCodec(user),
},
catLimiter: catLimiter,
tailLimiter: tailLimiter,
diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go
new file mode 100644
index 0000000..7fad042
--- /dev/null
+++ b/internal/server/handlers/turbo_manager.go
@@ -0,0 +1,142 @@
+package handlers
+
+import (
+ "time"
+
+ "github.com/mimecast/dtail/internal/io/dlog"
+ user "github.com/mimecast/dtail/internal/user/server"
+)
+
+type turboManager struct {
+ mode bool
+ lines chan []byte
+ buffer []byte
+ eof chan struct{}
+}
+
+func (t *turboManager) enable() {
+ t.mode = true
+ if t.lines == nil {
+ t.lines = make(chan []byte, 1000) // Large buffer for performance
+ }
+ // Always create a new EOF channel for each batch of files.
+ t.eof = make(chan struct{})
+}
+
+func (t *turboManager) enabled() bool {
+ return t.mode
+}
+
+func (t *turboManager) hasEOF() bool {
+ return t.eof != nil
+}
+
+func (t *turboManager) signalEOF() {
+ if t.eof == nil {
+ return
+ }
+
+ select {
+ case <-t.eof:
+ // Already closed
+ default:
+ close(t.eof)
+ }
+}
+
+func (t *turboManager) channel() chan []byte {
+ return t.lines
+}
+
+func (t *turboManager) channelLen() int {
+ if t.lines == nil {
+ return 0
+ }
+ return len(t.lines)
+}
+
+func (t *turboManager) flush(user *user.User) {
+ if t.lines == nil {
+ return
+ }
+
+ dlog.Server.Debug(user, "Flushing turbo data", "channelLen", len(t.lines))
+
+ timeout := time.After(2 * time.Second)
+ for {
+ select {
+ case <-timeout:
+ dlog.Server.Warn(user, "Timeout while flushing turbo data", "remaining", len(t.lines))
+ return
+ default:
+ if len(t.lines) == 0 {
+ dlog.Server.Debug(user, "Turbo channel drained successfully")
+ return
+ }
+ // Give the reader time to process.
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+// tryRead tries to serve data from turbo state and channels.
+// Returns handled=false when caller should continue with normal path.
+func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) {
+ if !t.mode {
+ return 0, false
+ }
+
+ if len(t.buffer) > 0 {
+ dlog.Server.Trace(user, "baseHandler.Read", "using buffered turbo data", "bufferedLen", len(t.buffer))
+ n = copy(p, t.buffer)
+ t.buffer = t.buffer[n:]
+ dlog.Server.Trace(user, "baseHandler.Read", "after buffer read", "copied", n, "remaining", len(t.buffer))
+ return n, true
+ }
+
+ if t.lines == nil {
+ return 0, false
+ }
+
+ channelLen := len(t.lines)
+ dlog.Server.Trace(user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen)
+
+ select {
+ case turboData := <-t.lines:
+ dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData))
+ n = copy(p, turboData)
+ if n < len(turboData) {
+ t.buffer = turboData[n:]
+ dlog.Server.Trace(user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(t.buffer))
+ }
+ return n, true
+ default:
+ if channelLen > 0 {
+ dlog.Server.Trace(user, "baseHandler.Read", "channel has data but not available, waiting")
+ time.Sleep(time.Millisecond)
+ select {
+ case turboData := <-t.lines:
+ dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData))
+ n = copy(p, turboData)
+ if n < len(turboData) {
+ t.buffer = turboData[n:]
+ }
+ return n, true
+ default:
+ // Still no data.
+ }
+ }
+
+ if t.eof != nil {
+ select {
+ case <-t.eof:
+ dlog.Server.Trace(user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode")
+ t.mode = false
+ default:
+ }
+ }
+
+ dlog.Server.Trace(user, "baseHandler.Read", "no data in turboLines, falling through")
+ return 0, false
+ }
+}
diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go
index 62225bd..d867671 100644
--- a/internal/server/handlers/turbo_writer.go
+++ b/internal/server/handlers/turbo_writer.go
@@ -200,7 +200,7 @@ func (w *DirectTurboWriter) Flush() error {
// Force flush any remaining data
err := w.flushBuffer()
-
+
// For serverless mode, ensure everything is written to output
if w.serverless {
// Ensure writer is flushed if it supports it
@@ -208,7 +208,7 @@ func (w *DirectTurboWriter) Flush() error {
flusher.Flush()
}
}
-
+
return err
}
@@ -421,7 +421,8 @@ func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, s
// sendToTurboChannel sends buffered data to the turbo channel with retry logic.
// Handles channel backpressure by waiting and retrying. Must be called with mutex held.
func (w *TurboNetworkWriter) sendToTurboChannel() error {
- if w.handler.turboLines == nil {
+ turboCh := w.handler.GetTurboChannel()
+ if turboCh == nil {
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "turboLines channel is nil")
w.writeBuf.Reset()
return nil
@@ -434,7 +435,7 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error {
// Send data to turbo channel, retry once if full
select {
- case w.handler.turboLines <- data:
+ case turboCh <- data:
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully")
w.writeBuf.Reset()
return nil
@@ -442,7 +443,7 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error {
// Channel full, wait a bit and retry
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry")
time.Sleep(time.Millisecond)
- w.handler.turboLines <- data
+ turboCh <- data
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel after retry")
w.writeBuf.Reset()
return nil
@@ -467,41 +468,42 @@ func (w *TurboNetworkWriter) WriteServerMessage(message string) error {
// Flush ensures all data is written
func (w *TurboNetworkWriter) Flush() error {
dlog.Server.Trace("TurboNetworkWriter.Flush", "called")
-
+
w.mutex.Lock()
defer w.mutex.Unlock()
-
+
// If we have any buffered data, send it now
if w.writeBuf.Len() > 0 {
dlog.Server.Trace("TurboNetworkWriter.Flush", "flushing buffered data", "bufSize", w.writeBuf.Len())
-
- if w.handler.turboLines != nil {
+
+ turboCh := w.handler.GetTurboChannel()
+ if turboCh != nil {
data := make([]byte, w.writeBuf.Len())
copy(data, w.writeBuf.Bytes())
-
+
// Force send the data
- w.handler.turboLines <- data
+ turboCh <- data
w.writeBuf.Reset()
dlog.Server.Trace("TurboNetworkWriter.Flush", "flushed data to channel")
}
}
-
+
// Wait for the channel to have some space to ensure data is being processed
// Don't close the EOF channel here as it may be used for multiple files
- if w.handler.turboLines != nil {
+ if w.handler.GetTurboChannel() != nil {
// Wait until channel has been drained somewhat
- for i := 0; i < 100 && len(w.handler.turboLines) > 900; i++ {
- dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", len(w.handler.turboLines))
+ for i := 0; i < 100 && w.handler.TurboChannelLen() > 900; i++ {
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", w.handler.TurboChannelLen())
time.Sleep(10 * time.Millisecond)
}
- dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", len(w.handler.turboLines))
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", w.handler.TurboChannelLen())
}
-
+
// Wait a bit to ensure data is processed
// This is crucial for integration tests
time.Sleep(10 * time.Millisecond)
dlog.Server.Trace("TurboNetworkWriter.Flush", "completed")
-
+
return nil
}