diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 09:02:34 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 09:02:34 +0200 |
| commit | 174bd919ab58e15a1841df428025ea9cc8ef7e3a (patch) | |
| tree | 80264b611389cfca486384887c9324eaac34e98e /internal | |
| parent | 50a40f6e77e9f9a6f65e0596c789f67b91f6a6e1 (diff) | |
Extract protocol and turbo responsibilities from baseHandler (task 327)
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/server/handlers/basehandler.go | 196 | ||||
| -rw-r--r-- | internal/server/handlers/healthhandler.go | 1 | ||||
| -rw-r--r-- | internal/server/handlers/protocol_codec.go | 72 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 11 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 1 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_manager.go | 142 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_writer.go | 38 |
7 files changed, 276 insertions, 185 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 3bb824b..5e9f1ee 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -3,11 +3,8 @@ package handlers import ( "bytes" "context" - "encoding/base64" - "errors" "fmt" "io" - "strconv" "strings" "sync" "sync/atomic" @@ -31,13 +28,14 @@ type baseHandler struct { handleCommandCb handleCommandCb lines chan *line.Line aggregate *server.Aggregate - turboAggregate *server.TurboAggregate // Turbo mode aggregate + turboAggregate *server.TurboAggregate // Turbo mode aggregate maprMessages chan string serverMessages chan string hostname string user *user.User ackCloseReceived chan struct{} activeCommands int32 + codec protocolCodec readBuf bytes.Buffer writeBuf bytes.Buffer @@ -47,12 +45,8 @@ type baseHandler struct { quiet bool plain bool serverless bool - - // Turbo mode support - turboMode bool - turboLines chan []byte // Pre-formatted lines for turbo mode - turboBuffer []byte // Buffer for partially sent turbo data - turboEOF chan struct{} // Signal when turbo data is complete + + turbo turboManager } // Shutdown the handler. @@ -79,66 +73,8 @@ func (h *baseHandler) Done() <-chan struct{} { func (h *baseHandler) Read(p []byte) (n int, err error) { defer h.readBuf.Reset() - // In turbo mode, check if we have buffered data first - if h.turboMode && len(h.turboBuffer) > 0 { - dlog.Server.Trace(h.user, "baseHandler.Read", "using buffered turbo data", "bufferedLen", len(h.turboBuffer)) - n = copy(p, h.turboBuffer) - h.turboBuffer = h.turboBuffer[n:] - dlog.Server.Trace(h.user, "baseHandler.Read", "after buffer read", "copied", n, "remaining", len(h.turboBuffer)) - return - } - - // In turbo mode, prioritize pre-formatted turbo lines - if h.turboMode && h.turboLines != nil { - channelLen := len(h.turboLines) - dlog.Server.Trace(h.user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen) - - // Try to read from the channel - select { - case turboData := <-h.turboLines: - dlog.Server.Trace(h.user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData)) - n = copy(p, turboData) - // If we couldn't send all data, buffer the rest - if n < len(turboData) { - h.turboBuffer = turboData[n:] - dlog.Server.Trace(h.user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(h.turboBuffer)) - } - return - default: - // No data immediately available - if channelLen > 0 { - // There's data in the channel but we couldn't get it immediately - // Wait a bit and try again - dlog.Server.Trace(h.user, "baseHandler.Read", "channel has data but not available, waiting") - time.Sleep(time.Millisecond) - select { - case turboData := <-h.turboLines: - dlog.Server.Trace(h.user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData)) - n = copy(p, turboData) - if n < len(turboData) { - h.turboBuffer = turboData[n:] - } - return - default: - // Still no data - } - } - - // Channel is truly empty, check if we should continue in turbo mode - // Only disable turbo mode if we've been signaled to do so - if h.turboEOF != nil { - select { - case <-h.turboEOF: - dlog.Server.Trace(h.user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode") - h.turboMode = false - default: - // EOF not signaled yet, continue in turbo mode - } - } - - dlog.Server.Trace(h.user, "baseHandler.Read", "no data in turboLines, falling through") - // Fall through to normal processing - } + if n, handled := h.turbo.tryRead(p, h.user); handled { + return n, nil } select { @@ -264,54 +200,11 @@ func (h *baseHandler) handleCommand(commandStr string) { } func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) { - argc := len(args) - var add string - - if argc <= 2 || args[0] != "protocol" { - return args, argc, add, errors.New("unable to determine protocol version") - } - - if args[1] != protocol.ProtocolCompat { - clientCompat, _ := strconv.Atoi(args[1]) - serverCompat, _ := strconv.Atoi(protocol.ProtocolCompat) - if clientCompat <= 3 { - // Protocol version 3 or lower expect a newline as message separator - // One day (after 2 major versions) this exception may be removed! - add = "\n" - } - - toUpdate := "client" - if clientCompat > serverCompat { - toUpdate = "server" - } - err := fmt.Errorf("the DTail server protocol version '%s' does not match "+ - "client protocol version '%s', please update DTail %s", - protocol.ProtocolCompat, args[1], toUpdate) - return args, argc, add, err - } - - return args[2:], argc - 2, add, nil + return h.codec.handleProtocolVersion(args) } func (h *baseHandler) handleBase64(args []string, argc int) ([]string, int, error) { - err := errors.New("unable to decode client message, DTail server and client " + - "versions may not be compatible") - if argc != 2 || args[0] != "base64" { - return args, argc, err - } - - decoded, err := base64.StdEncoding.DecodeString(args[1]) - if err != nil { - return args, argc, err - } - decodedStr := string(decoded) - - args = strings.Split(decodedStr, " ") - argc = len(args) - dlog.Server.Trace(h.user, "Base64 decoded received command", - decodedStr, argc, args) - - return args, argc, nil + return h.codec.handleBase64(args, argc) } func (h *baseHandler) handleAckCommand(argc int, args []string) { @@ -370,24 +263,21 @@ func (h *baseHandler) flush() { lineCount := len(h.lines) serverCount := len(h.serverMessages) maprCount := len(h.maprMessages) - turboCount := 0 - if h.turboLines != nil { - turboCount = len(h.turboLines) - } + turboCount := h.turbo.channelLen() dlog.Server.Trace(h.user, "flush", "lines", lineCount, "server", serverCount, "mapr", maprCount, "turbo", turboCount) return lineCount + serverCount + maprCount + turboCount } - + // Increase iterations for turbo mode to handle large file batches maxIterations := 100 - if h.turboMode { + if h.turbo.enabled() { maxIterations = 300 // Give more time for turbo mode to drain } // Also increase iterations if we have MapReduce messages if h.turboAggregate != nil || h.aggregate != nil { maxIterations = 300 // Give more time for MapReduce results } - + for i := 0; i < maxIterations; i++ { if numUnsentMessages() == 0 { dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h)) @@ -402,13 +292,13 @@ func (h *baseHandler) flush() { func (h *baseHandler) shutdown() { // Log current state at shutdown activeCommands := atomic.LoadInt32(&h.activeCommands) - dlog.Server.Info(h.user, "shutdown() called", "activeCommands", activeCommands, "turboMode", h.turboMode) - + dlog.Server.Info(h.user, "shutdown() called", "activeCommands", activeCommands, "turboMode", h.turbo.enabled()) + // In turbo mode, ensure all data is flushed before shutdown - if h.turboMode { + if h.turbo.enabled() { h.flushTurboData() } - + // Shutdown aggregates BEFORE flush to ensure MapReduce data is available if h.turboAggregate != nil { dlog.Server.Info(h.user, "Shutting down turbo aggregate in shutdown()") @@ -422,7 +312,7 @@ func (h *baseHandler) shutdown() { // Give time for serialization to complete time.Sleep(100 * time.Millisecond) } - + h.flush() go func() { @@ -452,47 +342,35 @@ func (h *baseHandler) decrementActiveCommands() int32 { // EnableTurboMode enables turbo mode for direct line processing func (h *baseHandler) EnableTurboMode() { - h.turboMode = true - if h.turboLines == nil { - h.turboLines = make(chan []byte, 1000) // Large buffer for performance - } - // Always create a new turboEOF channel for each batch of files - // This ensures proper synchronization when processing multiple file batches - h.turboEOF = make(chan struct{}) + h.turbo.enable() } // IsTurboMode returns true if turbo mode is enabled func (h *baseHandler) IsTurboMode() bool { - return h.turboMode + return h.turbo.enabled() +} + +// HasTurboEOF returns true when a turbo EOF channel exists. +func (h *baseHandler) HasTurboEOF() bool { + return h.turbo.hasEOF() +} + +// SignalTurboEOF closes turbo EOF channel once. +func (h *baseHandler) SignalTurboEOF() { + h.turbo.signalEOF() } // flushTurboData ensures all turbo channel data is processed func (h *baseHandler) flushTurboData() { - if h.turboLines == nil { - return - } - - dlog.Server.Debug(h.user, "Flushing turbo data", "channelLen", len(h.turboLines)) - - // Wait for turbo channel to drain with a timeout - timeout := time.After(2 * time.Second) - for { - select { - case <-timeout: - dlog.Server.Warn(h.user, "Timeout while flushing turbo data", "remaining", len(h.turboLines)) - return - default: - if len(h.turboLines) == 0 { - dlog.Server.Debug(h.user, "Turbo channel drained successfully") - return - } - // Give the reader time to process - time.Sleep(10 * time.Millisecond) - } - } + h.turbo.flush(h.user) } // GetTurboChannel returns the turbo lines channel for direct writing -func (h *baseHandler) GetTurboChannel() chan<- []byte { - return h.turboLines +func (h *baseHandler) GetTurboChannel() chan []byte { + return h.turbo.channel() +} + +// TurboChannelLen returns current turbo channel buffered size. +func (h *baseHandler) TurboChannelLen() int { + return h.turbo.channelLen() } diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go index 362fe24..5abb3f4 100644 --- a/internal/server/handlers/healthhandler.go +++ b/internal/server/handlers/healthhandler.go @@ -28,6 +28,7 @@ func NewHealthHandler(user *user.User) *HealthHandler { maprMessages: make(chan string, 10), ackCloseReceived: make(chan struct{}), user: user, + codec: newProtocolCodec(user), }, } h.handleCommandCb = h.handleHealthCommand diff --git a/internal/server/handlers/protocol_codec.go b/internal/server/handlers/protocol_codec.go new file mode 100644 index 0000000..192cc81 --- /dev/null +++ b/internal/server/handlers/protocol_codec.go @@ -0,0 +1,72 @@ +package handlers + +import ( + "encoding/base64" + "errors" + "fmt" + "strconv" + "strings" + + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/protocol" + user "github.com/mimecast/dtail/internal/user/server" +) + +type protocolCodec struct { + user *user.User +} + +func newProtocolCodec(user *user.User) protocolCodec { + return protocolCodec{user: user} +} + +func (c protocolCodec) handleProtocolVersion(args []string) ([]string, int, string, error) { + argc := len(args) + var add string + + if argc <= 2 || args[0] != "protocol" { + return args, argc, add, errors.New("unable to determine protocol version") + } + + if args[1] != protocol.ProtocolCompat { + clientCompat, _ := strconv.Atoi(args[1]) + serverCompat, _ := strconv.Atoi(protocol.ProtocolCompat) + if clientCompat <= 3 { + // Protocol version 3 or lower expect a newline as message separator + // One day (after 2 major versions) this exception may be removed! + add = "\n" + } + + toUpdate := "client" + if clientCompat > serverCompat { + toUpdate = "server" + } + err := fmt.Errorf("the DTail server protocol version '%s' does not match "+ + "client protocol version '%s', please update DTail %s", + protocol.ProtocolCompat, args[1], toUpdate) + return args, argc, add, err + } + + return args[2:], argc - 2, add, nil +} + +func (c protocolCodec) handleBase64(args []string, argc int) ([]string, int, error) { + err := errors.New("unable to decode client message, DTail server and client " + + "versions may not be compatible") + if argc != 2 || args[0] != "base64" { + return args, argc, err + } + + decoded, err := base64.StdEncoding.DecodeString(args[1]) + if err != nil { + return args, argc, err + } + decodedStr := string(decoded) + + args = strings.Split(decodedStr, " ") + argc = len(args) + dlog.Server.Trace(c.user, "Base64 decoded received command", + decodedStr, argc, args) + + return args, argc, nil +} diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 0375807..3410499 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -124,19 +124,14 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, // This is crucial for proper shutdown in server mode if !r.server.serverCfg.TurboBoostDisable && r.server.aggregate == nil && (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) { - if r.server.IsTurboMode() && r.server.turboEOF != nil { + if r.server.IsTurboMode() && r.server.HasTurboEOF() { dlog.Server.Debug(r.server.user, "Turbo mode: flushing data before EOF signal") // Ensure all turbo data is flushed before signaling EOF r.server.flushTurboData() - // Signal EOF by closing the channel, but only if it hasn't been closed yet - select { - case <-r.server.turboEOF: - // Already closed - default: - close(r.server.turboEOF) - } + // Signal EOF by closing the channel, but only once. + r.server.SignalTurboEOF() // Wait to ensure all data is transmitted // This is especially important when files are queued due to concurrency limits diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 92619d7..f40081e 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -46,6 +46,7 @@ func NewServerHandler(user *user.User, catLimiter, maprMessages: make(chan string, 10), ackCloseReceived: make(chan struct{}), user: user, + codec: newProtocolCodec(user), }, catLimiter: catLimiter, tailLimiter: tailLimiter, diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go new file mode 100644 index 0000000..7fad042 --- /dev/null +++ b/internal/server/handlers/turbo_manager.go @@ -0,0 +1,142 @@ +package handlers + +import ( + "time" + + "github.com/mimecast/dtail/internal/io/dlog" + user "github.com/mimecast/dtail/internal/user/server" +) + +type turboManager struct { + mode bool + lines chan []byte + buffer []byte + eof chan struct{} +} + +func (t *turboManager) enable() { + t.mode = true + if t.lines == nil { + t.lines = make(chan []byte, 1000) // Large buffer for performance + } + // Always create a new EOF channel for each batch of files. + t.eof = make(chan struct{}) +} + +func (t *turboManager) enabled() bool { + return t.mode +} + +func (t *turboManager) hasEOF() bool { + return t.eof != nil +} + +func (t *turboManager) signalEOF() { + if t.eof == nil { + return + } + + select { + case <-t.eof: + // Already closed + default: + close(t.eof) + } +} + +func (t *turboManager) channel() chan []byte { + return t.lines +} + +func (t *turboManager) channelLen() int { + if t.lines == nil { + return 0 + } + return len(t.lines) +} + +func (t *turboManager) flush(user *user.User) { + if t.lines == nil { + return + } + + dlog.Server.Debug(user, "Flushing turbo data", "channelLen", len(t.lines)) + + timeout := time.After(2 * time.Second) + for { + select { + case <-timeout: + dlog.Server.Warn(user, "Timeout while flushing turbo data", "remaining", len(t.lines)) + return + default: + if len(t.lines) == 0 { + dlog.Server.Debug(user, "Turbo channel drained successfully") + return + } + // Give the reader time to process. + time.Sleep(10 * time.Millisecond) + } + } +} + +// tryRead tries to serve data from turbo state and channels. +// Returns handled=false when caller should continue with normal path. +func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) { + if !t.mode { + return 0, false + } + + if len(t.buffer) > 0 { + dlog.Server.Trace(user, "baseHandler.Read", "using buffered turbo data", "bufferedLen", len(t.buffer)) + n = copy(p, t.buffer) + t.buffer = t.buffer[n:] + dlog.Server.Trace(user, "baseHandler.Read", "after buffer read", "copied", n, "remaining", len(t.buffer)) + return n, true + } + + if t.lines == nil { + return 0, false + } + + channelLen := len(t.lines) + dlog.Server.Trace(user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen) + + select { + case turboData := <-t.lines: + dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData)) + n = copy(p, turboData) + if n < len(turboData) { + t.buffer = turboData[n:] + dlog.Server.Trace(user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(t.buffer)) + } + return n, true + default: + if channelLen > 0 { + dlog.Server.Trace(user, "baseHandler.Read", "channel has data but not available, waiting") + time.Sleep(time.Millisecond) + select { + case turboData := <-t.lines: + dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData)) + n = copy(p, turboData) + if n < len(turboData) { + t.buffer = turboData[n:] + } + return n, true + default: + // Still no data. + } + } + + if t.eof != nil { + select { + case <-t.eof: + dlog.Server.Trace(user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode") + t.mode = false + default: + } + } + + dlog.Server.Trace(user, "baseHandler.Read", "no data in turboLines, falling through") + return 0, false + } +} diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go index 62225bd..d867671 100644 --- a/internal/server/handlers/turbo_writer.go +++ b/internal/server/handlers/turbo_writer.go @@ -200,7 +200,7 @@ func (w *DirectTurboWriter) Flush() error { // Force flush any remaining data err := w.flushBuffer() - + // For serverless mode, ensure everything is written to output if w.serverless { // Ensure writer is flushed if it supports it @@ -208,7 +208,7 @@ func (w *DirectTurboWriter) Flush() error { flusher.Flush() } } - + return err } @@ -421,7 +421,8 @@ func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, s // sendToTurboChannel sends buffered data to the turbo channel with retry logic. // Handles channel backpressure by waiting and retrying. Must be called with mutex held. func (w *TurboNetworkWriter) sendToTurboChannel() error { - if w.handler.turboLines == nil { + turboCh := w.handler.GetTurboChannel() + if turboCh == nil { dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "turboLines channel is nil") w.writeBuf.Reset() return nil @@ -434,7 +435,7 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error { // Send data to turbo channel, retry once if full select { - case w.handler.turboLines <- data: + case turboCh <- data: dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully") w.writeBuf.Reset() return nil @@ -442,7 +443,7 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error { // Channel full, wait a bit and retry dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry") time.Sleep(time.Millisecond) - w.handler.turboLines <- data + turboCh <- data dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel after retry") w.writeBuf.Reset() return nil @@ -467,41 +468,42 @@ func (w *TurboNetworkWriter) WriteServerMessage(message string) error { // Flush ensures all data is written func (w *TurboNetworkWriter) Flush() error { dlog.Server.Trace("TurboNetworkWriter.Flush", "called") - + w.mutex.Lock() defer w.mutex.Unlock() - + // If we have any buffered data, send it now if w.writeBuf.Len() > 0 { dlog.Server.Trace("TurboNetworkWriter.Flush", "flushing buffered data", "bufSize", w.writeBuf.Len()) - - if w.handler.turboLines != nil { + + turboCh := w.handler.GetTurboChannel() + if turboCh != nil { data := make([]byte, w.writeBuf.Len()) copy(data, w.writeBuf.Bytes()) - + // Force send the data - w.handler.turboLines <- data + turboCh <- data w.writeBuf.Reset() dlog.Server.Trace("TurboNetworkWriter.Flush", "flushed data to channel") } } - + // Wait for the channel to have some space to ensure data is being processed // Don't close the EOF channel here as it may be used for multiple files - if w.handler.turboLines != nil { + if w.handler.GetTurboChannel() != nil { // Wait until channel has been drained somewhat - for i := 0; i < 100 && len(w.handler.turboLines) > 900; i++ { - dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", len(w.handler.turboLines)) + for i := 0; i < 100 && w.handler.TurboChannelLen() > 900; i++ { + dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", w.handler.TurboChannelLen()) time.Sleep(10 * time.Millisecond) } - dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", len(w.handler.turboLines)) + dlog.Server.Trace("TurboNetworkWriter.Flush", "channel status", "channelLen", w.handler.TurboChannelLen()) } - + // Wait a bit to ensure data is processed // This is crucial for integration tests time.Sleep(10 * time.Millisecond) dlog.Server.Trace("TurboNetworkWriter.Flush", "completed") - + return nil } |
