diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 09:02:34 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 09:02:34 +0200 |
| commit | 174bd919ab58e15a1841df428025ea9cc8ef7e3a (patch) | |
| tree | 80264b611389cfca486384887c9324eaac34e98e /internal/server/handlers/basehandler.go | |
| parent | 50a40f6e77e9f9a6f65e0596c789f67b91f6a6e1 (diff) | |
Extract protocol and turbo responsibilities from baseHandler (task 327)
Diffstat (limited to 'internal/server/handlers/basehandler.go')
| -rw-r--r-- | internal/server/handlers/basehandler.go | 196 |
1 files changed, 37 insertions, 159 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 3bb824b..5e9f1ee 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -3,11 +3,8 @@ package handlers import ( "bytes" "context" - "encoding/base64" - "errors" "fmt" "io" - "strconv" "strings" "sync" "sync/atomic" @@ -31,13 +28,14 @@ type baseHandler struct { handleCommandCb handleCommandCb lines chan *line.Line aggregate *server.Aggregate - turboAggregate *server.TurboAggregate // Turbo mode aggregate + turboAggregate *server.TurboAggregate // Turbo mode aggregate maprMessages chan string serverMessages chan string hostname string user *user.User ackCloseReceived chan struct{} activeCommands int32 + codec protocolCodec readBuf bytes.Buffer writeBuf bytes.Buffer @@ -47,12 +45,8 @@ type baseHandler struct { quiet bool plain bool serverless bool - - // Turbo mode support - turboMode bool - turboLines chan []byte // Pre-formatted lines for turbo mode - turboBuffer []byte // Buffer for partially sent turbo data - turboEOF chan struct{} // Signal when turbo data is complete + + turbo turboManager } // Shutdown the handler. @@ -79,66 +73,8 @@ func (h *baseHandler) Done() <-chan struct{} { func (h *baseHandler) Read(p []byte) (n int, err error) { defer h.readBuf.Reset() - // In turbo mode, check if we have buffered data first - if h.turboMode && len(h.turboBuffer) > 0 { - dlog.Server.Trace(h.user, "baseHandler.Read", "using buffered turbo data", "bufferedLen", len(h.turboBuffer)) - n = copy(p, h.turboBuffer) - h.turboBuffer = h.turboBuffer[n:] - dlog.Server.Trace(h.user, "baseHandler.Read", "after buffer read", "copied", n, "remaining", len(h.turboBuffer)) - return - } - - // In turbo mode, prioritize pre-formatted turbo lines - if h.turboMode && h.turboLines != nil { - channelLen := len(h.turboLines) - dlog.Server.Trace(h.user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen) - - // Try to read from the channel - select { - case turboData := <-h.turboLines: - dlog.Server.Trace(h.user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData)) - n = copy(p, turboData) - // If we couldn't send all data, buffer the rest - if n < len(turboData) { - h.turboBuffer = turboData[n:] - dlog.Server.Trace(h.user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(h.turboBuffer)) - } - return - default: - // No data immediately available - if channelLen > 0 { - // There's data in the channel but we couldn't get it immediately - // Wait a bit and try again - dlog.Server.Trace(h.user, "baseHandler.Read", "channel has data but not available, waiting") - time.Sleep(time.Millisecond) - select { - case turboData := <-h.turboLines: - dlog.Server.Trace(h.user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData)) - n = copy(p, turboData) - if n < len(turboData) { - h.turboBuffer = turboData[n:] - } - return - default: - // Still no data - } - } - - // Channel is truly empty, check if we should continue in turbo mode - // Only disable turbo mode if we've been signaled to do so - if h.turboEOF != nil { - select { - case <-h.turboEOF: - dlog.Server.Trace(h.user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode") - h.turboMode = false - default: - // EOF not signaled yet, continue in turbo mode - } - } - - dlog.Server.Trace(h.user, "baseHandler.Read", "no data in turboLines, falling through") - // Fall through to normal processing - } + if n, handled := h.turbo.tryRead(p, h.user); handled { + return n, nil } select { @@ -264,54 +200,11 @@ func (h *baseHandler) handleCommand(commandStr string) { } func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) { - argc := len(args) - var add string - - if argc <= 2 || args[0] != "protocol" { - return args, argc, add, errors.New("unable to determine protocol version") - } - - if args[1] != protocol.ProtocolCompat { - clientCompat, _ := strconv.Atoi(args[1]) - serverCompat, _ := strconv.Atoi(protocol.ProtocolCompat) - if clientCompat <= 3 { - // Protocol version 3 or lower expect a newline as message separator - // One day (after 2 major versions) this exception may be removed! - add = "\n" - } - - toUpdate := "client" - if clientCompat > serverCompat { - toUpdate = "server" - } - err := fmt.Errorf("the DTail server protocol version '%s' does not match "+ - "client protocol version '%s', please update DTail %s", - protocol.ProtocolCompat, args[1], toUpdate) - return args, argc, add, err - } - - return args[2:], argc - 2, add, nil + return h.codec.handleProtocolVersion(args) } func (h *baseHandler) handleBase64(args []string, argc int) ([]string, int, error) { - err := errors.New("unable to decode client message, DTail server and client " + - "versions may not be compatible") - if argc != 2 || args[0] != "base64" { - return args, argc, err - } - - decoded, err := base64.StdEncoding.DecodeString(args[1]) - if err != nil { - return args, argc, err - } - decodedStr := string(decoded) - - args = strings.Split(decodedStr, " ") - argc = len(args) - dlog.Server.Trace(h.user, "Base64 decoded received command", - decodedStr, argc, args) - - return args, argc, nil + return h.codec.handleBase64(args, argc) } func (h *baseHandler) handleAckCommand(argc int, args []string) { @@ -370,24 +263,21 @@ func (h *baseHandler) flush() { lineCount := len(h.lines) serverCount := len(h.serverMessages) maprCount := len(h.maprMessages) - turboCount := 0 - if h.turboLines != nil { - turboCount = len(h.turboLines) - } + turboCount := h.turbo.channelLen() dlog.Server.Trace(h.user, "flush", "lines", lineCount, "server", serverCount, "mapr", maprCount, "turbo", turboCount) return lineCount + serverCount + maprCount + turboCount } - + // Increase iterations for turbo mode to handle large file batches maxIterations := 100 - if h.turboMode { + if h.turbo.enabled() { maxIterations = 300 // Give more time for turbo mode to drain } // Also increase iterations if we have MapReduce messages if h.turboAggregate != nil || h.aggregate != nil { maxIterations = 300 // Give more time for MapReduce results } - + for i := 0; i < maxIterations; i++ { if numUnsentMessages() == 0 { dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h)) @@ -402,13 +292,13 @@ func (h *baseHandler) flush() { func (h *baseHandler) shutdown() { // Log current state at shutdown activeCommands := atomic.LoadInt32(&h.activeCommands) - dlog.Server.Info(h.user, "shutdown() called", "activeCommands", activeCommands, "turboMode", h.turboMode) - + dlog.Server.Info(h.user, "shutdown() called", "activeCommands", activeCommands, "turboMode", h.turbo.enabled()) + // In turbo mode, ensure all data is flushed before shutdown - if h.turboMode { + if h.turbo.enabled() { h.flushTurboData() } - + // Shutdown aggregates BEFORE flush to ensure MapReduce data is available if h.turboAggregate != nil { dlog.Server.Info(h.user, "Shutting down turbo aggregate in shutdown()") @@ -422,7 +312,7 @@ func (h *baseHandler) shutdown() { // Give time for serialization to complete time.Sleep(100 * time.Millisecond) } - + h.flush() go func() { @@ -452,47 +342,35 @@ func (h *baseHandler) decrementActiveCommands() int32 { // EnableTurboMode enables turbo mode for direct line processing func (h *baseHandler) EnableTurboMode() { - h.turboMode = true - if h.turboLines == nil { - h.turboLines = make(chan []byte, 1000) // Large buffer for performance - } - // Always create a new turboEOF channel for each batch of files - // This ensures proper synchronization when processing multiple file batches - h.turboEOF = make(chan struct{}) + h.turbo.enable() } // IsTurboMode returns true if turbo mode is enabled func (h *baseHandler) IsTurboMode() bool { - return h.turboMode + return h.turbo.enabled() +} + +// HasTurboEOF returns true when a turbo EOF channel exists. +func (h *baseHandler) HasTurboEOF() bool { + return h.turbo.hasEOF() +} + +// SignalTurboEOF closes turbo EOF channel once. +func (h *baseHandler) SignalTurboEOF() { + h.turbo.signalEOF() } // flushTurboData ensures all turbo channel data is processed func (h *baseHandler) flushTurboData() { - if h.turboLines == nil { - return - } - - dlog.Server.Debug(h.user, "Flushing turbo data", "channelLen", len(h.turboLines)) - - // Wait for turbo channel to drain with a timeout - timeout := time.After(2 * time.Second) - for { - select { - case <-timeout: - dlog.Server.Warn(h.user, "Timeout while flushing turbo data", "remaining", len(h.turboLines)) - return - default: - if len(h.turboLines) == 0 { - dlog.Server.Debug(h.user, "Turbo channel drained successfully") - return - } - // Give the reader time to process - time.Sleep(10 * time.Millisecond) - } - } + h.turbo.flush(h.user) } // GetTurboChannel returns the turbo lines channel for direct writing -func (h *baseHandler) GetTurboChannel() chan<- []byte { - return h.turboLines +func (h *baseHandler) GetTurboChannel() chan []byte { + return h.turbo.channel() +} + +// TurboChannelLen returns current turbo channel buffered size. +func (h *baseHandler) TurboChannelLen() int { + return h.turbo.channelLen() } |
