summaryrefslogtreecommitdiff
path: root/internal/server/handlers/basehandler.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 09:02:34 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 09:02:34 +0200
commit174bd919ab58e15a1841df428025ea9cc8ef7e3a (patch)
tree80264b611389cfca486384887c9324eaac34e98e /internal/server/handlers/basehandler.go
parent50a40f6e77e9f9a6f65e0596c789f67b91f6a6e1 (diff)
Extract protocol and turbo responsibilities from baseHandler (task 327)
Diffstat (limited to 'internal/server/handlers/basehandler.go')
-rw-r--r--internal/server/handlers/basehandler.go196
1 files changed, 37 insertions, 159 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 3bb824b..5e9f1ee 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -3,11 +3,8 @@ package handlers
import (
"bytes"
"context"
- "encoding/base64"
- "errors"
"fmt"
"io"
- "strconv"
"strings"
"sync"
"sync/atomic"
@@ -31,13 +28,14 @@ type baseHandler struct {
handleCommandCb handleCommandCb
lines chan *line.Line
aggregate *server.Aggregate
- turboAggregate *server.TurboAggregate // Turbo mode aggregate
+ turboAggregate *server.TurboAggregate // Turbo mode aggregate
maprMessages chan string
serverMessages chan string
hostname string
user *user.User
ackCloseReceived chan struct{}
activeCommands int32
+ codec protocolCodec
readBuf bytes.Buffer
writeBuf bytes.Buffer
@@ -47,12 +45,8 @@ type baseHandler struct {
quiet bool
plain bool
serverless bool
-
- // Turbo mode support
- turboMode bool
- turboLines chan []byte // Pre-formatted lines for turbo mode
- turboBuffer []byte // Buffer for partially sent turbo data
- turboEOF chan struct{} // Signal when turbo data is complete
+
+ turbo turboManager
}
// Shutdown the handler.
@@ -79,66 +73,8 @@ func (h *baseHandler) Done() <-chan struct{} {
func (h *baseHandler) Read(p []byte) (n int, err error) {
defer h.readBuf.Reset()
- // In turbo mode, check if we have buffered data first
- if h.turboMode && len(h.turboBuffer) > 0 {
- dlog.Server.Trace(h.user, "baseHandler.Read", "using buffered turbo data", "bufferedLen", len(h.turboBuffer))
- n = copy(p, h.turboBuffer)
- h.turboBuffer = h.turboBuffer[n:]
- dlog.Server.Trace(h.user, "baseHandler.Read", "after buffer read", "copied", n, "remaining", len(h.turboBuffer))
- return
- }
-
- // In turbo mode, prioritize pre-formatted turbo lines
- if h.turboMode && h.turboLines != nil {
- channelLen := len(h.turboLines)
- dlog.Server.Trace(h.user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen)
-
- // Try to read from the channel
- select {
- case turboData := <-h.turboLines:
- dlog.Server.Trace(h.user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData))
- n = copy(p, turboData)
- // If we couldn't send all data, buffer the rest
- if n < len(turboData) {
- h.turboBuffer = turboData[n:]
- dlog.Server.Trace(h.user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(h.turboBuffer))
- }
- return
- default:
- // No data immediately available
- if channelLen > 0 {
- // There's data in the channel but we couldn't get it immediately
- // Wait a bit and try again
- dlog.Server.Trace(h.user, "baseHandler.Read", "channel has data but not available, waiting")
- time.Sleep(time.Millisecond)
- select {
- case turboData := <-h.turboLines:
- dlog.Server.Trace(h.user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData))
- n = copy(p, turboData)
- if n < len(turboData) {
- h.turboBuffer = turboData[n:]
- }
- return
- default:
- // Still no data
- }
- }
-
- // Channel is truly empty, check if we should continue in turbo mode
- // Only disable turbo mode if we've been signaled to do so
- if h.turboEOF != nil {
- select {
- case <-h.turboEOF:
- dlog.Server.Trace(h.user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode")
- h.turboMode = false
- default:
- // EOF not signaled yet, continue in turbo mode
- }
- }
-
- dlog.Server.Trace(h.user, "baseHandler.Read", "no data in turboLines, falling through")
- // Fall through to normal processing
- }
+ if n, handled := h.turbo.tryRead(p, h.user); handled {
+ return n, nil
}
select {
@@ -264,54 +200,11 @@ func (h *baseHandler) handleCommand(commandStr string) {
}
func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) {
- argc := len(args)
- var add string
-
- if argc <= 2 || args[0] != "protocol" {
- return args, argc, add, errors.New("unable to determine protocol version")
- }
-
- if args[1] != protocol.ProtocolCompat {
- clientCompat, _ := strconv.Atoi(args[1])
- serverCompat, _ := strconv.Atoi(protocol.ProtocolCompat)
- if clientCompat <= 3 {
- // Protocol version 3 or lower expect a newline as message separator
- // One day (after 2 major versions) this exception may be removed!
- add = "\n"
- }
-
- toUpdate := "client"
- if clientCompat > serverCompat {
- toUpdate = "server"
- }
- err := fmt.Errorf("the DTail server protocol version '%s' does not match "+
- "client protocol version '%s', please update DTail %s",
- protocol.ProtocolCompat, args[1], toUpdate)
- return args, argc, add, err
- }
-
- return args[2:], argc - 2, add, nil
+ return h.codec.handleProtocolVersion(args)
}
func (h *baseHandler) handleBase64(args []string, argc int) ([]string, int, error) {
- err := errors.New("unable to decode client message, DTail server and client " +
- "versions may not be compatible")
- if argc != 2 || args[0] != "base64" {
- return args, argc, err
- }
-
- decoded, err := base64.StdEncoding.DecodeString(args[1])
- if err != nil {
- return args, argc, err
- }
- decodedStr := string(decoded)
-
- args = strings.Split(decodedStr, " ")
- argc = len(args)
- dlog.Server.Trace(h.user, "Base64 decoded received command",
- decodedStr, argc, args)
-
- return args, argc, nil
+ return h.codec.handleBase64(args, argc)
}
func (h *baseHandler) handleAckCommand(argc int, args []string) {
@@ -370,24 +263,21 @@ func (h *baseHandler) flush() {
lineCount := len(h.lines)
serverCount := len(h.serverMessages)
maprCount := len(h.maprMessages)
- turboCount := 0
- if h.turboLines != nil {
- turboCount = len(h.turboLines)
- }
+ turboCount := h.turbo.channelLen()
dlog.Server.Trace(h.user, "flush", "lines", lineCount, "server", serverCount, "mapr", maprCount, "turbo", turboCount)
return lineCount + serverCount + maprCount + turboCount
}
-
+
// Increase iterations for turbo mode to handle large file batches
maxIterations := 100
- if h.turboMode {
+ if h.turbo.enabled() {
maxIterations = 300 // Give more time for turbo mode to drain
}
// Also increase iterations if we have MapReduce messages
if h.turboAggregate != nil || h.aggregate != nil {
maxIterations = 300 // Give more time for MapReduce results
}
-
+
for i := 0; i < maxIterations; i++ {
if numUnsentMessages() == 0 {
dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h))
@@ -402,13 +292,13 @@ func (h *baseHandler) flush() {
func (h *baseHandler) shutdown() {
// Log current state at shutdown
activeCommands := atomic.LoadInt32(&h.activeCommands)
- dlog.Server.Info(h.user, "shutdown() called", "activeCommands", activeCommands, "turboMode", h.turboMode)
-
+ dlog.Server.Info(h.user, "shutdown() called", "activeCommands", activeCommands, "turboMode", h.turbo.enabled())
+
// In turbo mode, ensure all data is flushed before shutdown
- if h.turboMode {
+ if h.turbo.enabled() {
h.flushTurboData()
}
-
+
// Shutdown aggregates BEFORE flush to ensure MapReduce data is available
if h.turboAggregate != nil {
dlog.Server.Info(h.user, "Shutting down turbo aggregate in shutdown()")
@@ -422,7 +312,7 @@ func (h *baseHandler) shutdown() {
// Give time for serialization to complete
time.Sleep(100 * time.Millisecond)
}
-
+
h.flush()
go func() {
@@ -452,47 +342,35 @@ func (h *baseHandler) decrementActiveCommands() int32 {
// EnableTurboMode enables turbo mode for direct line processing
func (h *baseHandler) EnableTurboMode() {
- h.turboMode = true
- if h.turboLines == nil {
- h.turboLines = make(chan []byte, 1000) // Large buffer for performance
- }
- // Always create a new turboEOF channel for each batch of files
- // This ensures proper synchronization when processing multiple file batches
- h.turboEOF = make(chan struct{})
+ h.turbo.enable()
}
// IsTurboMode returns true if turbo mode is enabled
func (h *baseHandler) IsTurboMode() bool {
- return h.turboMode
+ return h.turbo.enabled()
+}
+
+// HasTurboEOF returns true when a turbo EOF channel exists.
+func (h *baseHandler) HasTurboEOF() bool {
+ return h.turbo.hasEOF()
+}
+
+// SignalTurboEOF closes turbo EOF channel once.
+func (h *baseHandler) SignalTurboEOF() {
+ h.turbo.signalEOF()
}
// flushTurboData ensures all turbo channel data is processed
func (h *baseHandler) flushTurboData() {
- if h.turboLines == nil {
- return
- }
-
- dlog.Server.Debug(h.user, "Flushing turbo data", "channelLen", len(h.turboLines))
-
- // Wait for turbo channel to drain with a timeout
- timeout := time.After(2 * time.Second)
- for {
- select {
- case <-timeout:
- dlog.Server.Warn(h.user, "Timeout while flushing turbo data", "remaining", len(h.turboLines))
- return
- default:
- if len(h.turboLines) == 0 {
- dlog.Server.Debug(h.user, "Turbo channel drained successfully")
- return
- }
- // Give the reader time to process
- time.Sleep(10 * time.Millisecond)
- }
- }
+ h.turbo.flush(h.user)
}
// GetTurboChannel returns the turbo lines channel for direct writing
-func (h *baseHandler) GetTurboChannel() chan<- []byte {
- return h.turboLines
+func (h *baseHandler) GetTurboChannel() chan []byte {
+ return h.turbo.channel()
+}
+
+// TurboChannelLen returns current turbo channel buffered size.
+func (h *baseHandler) TurboChannelLen() int {
+ return h.turbo.channelLen()
}