From 7a79d0a8bf58b05dfbae331d00275739530b9584 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Fri, 13 Mar 2026 09:13:51 +0200 Subject: task 682e6ae9: filter stale generation output --- internal/io/line/line.go | 4 + internal/server/handlers/basehandler.go | 166 +++++++++++++-------- internal/server/handlers/generation_output.go | 75 ++++++++++ internal/server/handlers/generation_output_test.go | 115 ++++++++++++++ internal/server/handlers/readcommand.go | 100 ++++++++++--- internal/server/handlers/readcommand_server.go | 6 + internal/server/handlers/serverhandler.go | 27 +++- internal/server/handlers/sessioncommand.go | 8 + internal/server/handlers/turbo_manager.go | 102 +++++++------ internal/server/handlers/turbo_writer.go | 66 +++++++- 10 files changed, 537 insertions(+), 132 deletions(-) create mode 100644 internal/server/handlers/generation_output.go create mode 100644 internal/server/handlers/generation_output_test.go diff --git a/internal/io/line/line.go b/internal/io/line/line.go index 97e7795..e1d7bf8 100644 --- a/internal/io/line/line.go +++ b/internal/io/line/line.go @@ -30,6 +30,8 @@ type Line struct { // directories in case multiple log files with the same basename are // followed. SourceID string + // Session generation this line belongs to. Zero means unscoped/legacy output. + Generation uint64 } // New creaters a new line object. This is a DTail internal helper structure for reading files. @@ -39,6 +41,7 @@ func New(content *bytes.Buffer, count uint64, transmittedPerc int, sourceID stri l.Count = count l.TransmittedPerc = transmittedPerc l.SourceID = sourceID + l.Generation = 0 return l } @@ -72,4 +75,5 @@ func (l *Line) NullValues() { l.Count = 0 l.TransmittedPerc = 0 l.SourceID = "" + l.Generation = 0 } diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 42cc4cc..66c2cb7 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -47,6 +47,8 @@ type baseHandler struct { serverless bool turbo turboManager + + activeGeneration func() uint64 } // Shutdown the handler. @@ -73,83 +75,108 @@ func (h *baseHandler) Done() <-chan struct{} { func (h *baseHandler) Read(p []byte) (n int, err error) { defer h.readBuf.Reset() - if n, handled := h.turbo.tryRead(p, h.user); handled { - return n, nil - } - - pollInterval := time.Second - if h.turbo.enabled() { - // Turbo reads require tighter wake-ups so we can continue draining the turbo channel. - pollInterval = h.turbo.resolvedReadRetryInterval() - } - poll := time.After(pollInterval) - - select { - case message := <-h.serverMessages: - if len(message) > 0 && message[0] == '.' { - // Handle hidden message (don't display to the user) - h.readBuf.WriteString(message) - h.readBuf.WriteByte(protocol.MessageDelimiter) - n = copy(p, h.readBuf.Bytes()) - return + for { + if n, handled := h.turbo.tryRead(p, h.user, h.shouldDropGeneration); handled { + if n == 0 { + continue + } + return n, nil } - if h.serverless { - return + pollInterval := time.Second + if h.turbo.enabled() { + // Turbo reads require tighter wake-ups so we can continue draining the turbo channel. + pollInterval = h.turbo.resolvedReadRetryInterval() } + poll := time.After(pollInterval) - // Skip empty server messages when in plain mode - if h.plain && (message == "" || message == "\n") { + select { + case message := <-h.serverMessages: + generation, decodedMessage := decodeGeneratedMessage(message) + if h.shouldDropGeneration(generation) { + continue + } + message = decodedMessage + if len(message) > 0 && message[0] == '.' { + // Handle hidden message (don't display to the user) + h.readBuf.WriteString(message) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) + return + } + + if h.serverless { + return + } + + // Skip empty server messages when in plain mode + if h.plain && (message == "" || message == "\n") { + return + } + + // Handle normal server message (display to the user). + formatServerMessage(&h.readBuf, h.hostname, message, h.plain) + n = copy(p, h.readBuf.Bytes()) return - } - // Handle normal server message (display to the user). - formatServerMessage(&h.readBuf, h.hostname, message, h.plain) - n = copy(p, h.readBuf.Bytes()) - - case message := <-h.maprMessages: - // Send mapreduce-aggregated data as a message. - h.readBuf.WriteString("AGGREGATE") - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(h.hostname) - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(message) - h.readBuf.WriteByte(protocol.MessageDelimiter) - n = copy(p, h.readBuf.Bytes()) - - case line := <-h.lines: - if h.plain { - h.readBuf.Write(line.Content.Bytes()) + case message := <-h.maprMessages: + generation, decodedMessage := decodeGeneratedMessage(message) + if h.shouldDropGeneration(generation) { + continue + } + message = decodedMessage + // Send mapreduce-aggregated data as a message. + h.readBuf.WriteString("AGGREGATE") + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(h.hostname) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(message) h.readBuf.WriteByte(protocol.MessageDelimiter) - } else { - formatRemoteLine( - &h.readBuf, - h.hostname, - fmt.Sprintf("%3d", line.TransmittedPerc), - line.Count, - line.SourceID, - line.Content.Bytes(), - ) - } - n = copy(p, h.readBuf.Bytes()) - pool.RecycleBytesBuffer(line.Content) - line.Recycle() + n = copy(p, h.readBuf.Bytes()) + return - case <-h.done.Done(): - err = io.EOF - return + case line := <-h.lines: + if line == nil { + continue + } + if h.shouldDropGeneration(line.Generation) { + pool.RecycleBytesBuffer(line.Content) + line.Recycle() + continue + } + if h.plain { + h.readBuf.Write(line.Content.Bytes()) + h.readBuf.WriteByte(protocol.MessageDelimiter) + } else { + formatRemoteLine( + &h.readBuf, + h.hostname, + fmt.Sprintf("%3d", line.TransmittedPerc), + line.Count, + line.SourceID, + line.Content.Bytes(), + ) + } + n = copy(p, h.readBuf.Bytes()) + pool.RecycleBytesBuffer(line.Content) + line.Recycle() + return - case <-poll: - // Wake periodically so turbo mode transitions don't leave this read blocked forever. - select { case <-h.done.Done(): err = io.EOF return - default: + + case <-poll: + // Wake periodically so turbo mode transitions don't leave this read blocked forever. + select { + case <-h.done.Done(): + err = io.EOF + return + default: + } + return } - return } - return } // Write is to receive data from the dtail client via Writer interface. @@ -288,6 +315,19 @@ func (h *baseHandler) sendln(ch chan<- string, message string) { h.send(ch, message+"\n") } +func (h *baseHandler) shouldDropGeneration(generation uint64) bool { + if generation == 0 || h.activeGeneration == nil { + return false + } + + activeGeneration := h.activeGeneration() + if activeGeneration == 0 { + return false + } + + return activeGeneration != generation +} + func (h *baseHandler) flush() { dlog.Server.Trace(h.user, "flush()") numUnsentMessages := func() int { diff --git a/internal/server/handlers/generation_output.go b/internal/server/handlers/generation_output.go new file mode 100644 index 0000000..aa9d195 --- /dev/null +++ b/internal/server/handlers/generation_output.go @@ -0,0 +1,75 @@ +package handlers + +import ( + "context" + "fmt" + "strconv" + "strings" +) + +type sessionGenerationKey struct{} + +const generationOutputPrefix = "\x1egen:" + +func withSessionGeneration(ctx context.Context, generation uint64) context.Context { + if ctx == nil || generation == 0 { + return ctx + } + return context.WithValue(ctx, sessionGenerationKey{}, generation) +} + +func sessionGenerationFromContext(ctx context.Context) uint64 { + if ctx == nil { + return 0 + } + + generation, _ := ctx.Value(sessionGenerationKey{}).(uint64) + return generation +} + +func encodeGeneratedMessage(generation uint64, message string) string { + if generation == 0 { + return message + } + return fmt.Sprintf("%s%d\x1e%s", generationOutputPrefix, generation, message) +} + +func decodeGeneratedMessage(message string) (uint64, string) { + if !strings.HasPrefix(message, generationOutputPrefix) { + return 0, message + } + + rest := strings.TrimPrefix(message, generationOutputPrefix) + parts := strings.SplitN(rest, "\x1e", 2) + if len(parts) != 2 { + return 0, message + } + + generation, err := strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return 0, message + } + + return generation, parts[1] +} + +func encodeGeneratedBytes(generation uint64, payload []byte) []byte { + if generation == 0 { + return payload + } + + prefix := []byte(fmt.Sprintf("%s%d\x1e", generationOutputPrefix, generation)) + data := make([]byte, 0, len(prefix)+len(payload)) + data = append(data, prefix...) + data = append(data, payload...) + return data +} + +func decodeGeneratedBytes(payload []byte) (uint64, []byte) { + message := string(payload) + generation, decoded := decodeGeneratedMessage(message) + if generation == 0 { + return 0, payload + } + return generation, []byte(decoded) +} diff --git a/internal/server/handlers/generation_output_test.go b/internal/server/handlers/generation_output_test.go new file mode 100644 index 0000000..6020c09 --- /dev/null +++ b/internal/server/handlers/generation_output_test.go @@ -0,0 +1,115 @@ +package handlers + +import ( + "bytes" + "strings" + "testing" + + "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/io/line" + userserver "github.com/mimecast/dtail/internal/user/server" +) + +func TestDecodeGeneratedMessage(t *testing.T) { + generation, message := decodeGeneratedMessage(encodeGeneratedMessage(7, "hello")) + if generation != 7 { + t.Fatalf("unexpected generation: %d", generation) + } + if message != "hello" { + t.Fatalf("unexpected message: %q", message) + } +} + +func TestBaseHandlerReadDropsStaleServerMessage(t *testing.T) { + handler := newGenerationTestHandler(2) + handler.serverMessages <- encodeGeneratedMessage(1, "stale\n") + handler.serverMessages <- encodeGeneratedMessage(2, "fresh\n") + + got := readHandlerOutput(t, &handler) + if strings.Contains(got, "stale") { + t.Fatalf("unexpected stale output: %q", got) + } + if !strings.Contains(got, "fresh") { + t.Fatalf("expected current output, got %q", got) + } +} + +func TestBaseHandlerReadDropsStaleMaprMessage(t *testing.T) { + handler := newGenerationTestHandler(3) + handler.maprMessages <- encodeGeneratedMessage(2, "old aggregate") + handler.maprMessages <- encodeGeneratedMessage(3, "new aggregate") + + got := readHandlerOutput(t, &handler) + if strings.Contains(got, "old aggregate") { + t.Fatalf("unexpected stale aggregate output: %q", got) + } + if !strings.Contains(got, "new aggregate") { + t.Fatalf("expected current aggregate output, got %q", got) + } +} + +func TestBaseHandlerReadDropsStaleLine(t *testing.T) { + handler := newGenerationTestHandler(4) + + staleLine := line.New(bytes.NewBufferString("stale line"), 1, 100, "app.log") + staleLine.Generation = 3 + currentLine := line.New(bytes.NewBufferString("fresh line"), 2, 100, "app.log") + currentLine.Generation = 4 + + handler.lines <- staleLine + handler.lines <- currentLine + + got := readHandlerOutput(t, &handler) + if strings.Contains(got, "stale line") { + t.Fatalf("unexpected stale line output: %q", got) + } + if !strings.Contains(got, "fresh line") { + t.Fatalf("expected current line output, got %q", got) + } +} + +func TestTurboManagerTryReadDropsStaleGeneration(t *testing.T) { + resetServerLogger(t) + + manager := turboManager{ + mode: true, + lines: make(chan []byte, 2), + } + manager.lines <- encodeGeneratedBytes(1, []byte("stale")) + manager.lines <- encodeGeneratedBytes(2, []byte("fresh")) + + buf := make([]byte, 32) + n, handled := manager.tryRead(buf, &userserver.User{Name: "turbo-test"}, func(generation uint64) bool { + return generation != 0 && generation != 2 + }) + if !handled { + t.Fatalf("expected turbo read to be handled") + } + if got := string(buf[:n]); got != "fresh" { + t.Fatalf("unexpected turbo output: %q", got) + } +} + +func newGenerationTestHandler(activeGeneration uint64) baseHandler { + return baseHandler{ + done: internal.NewDone(), + lines: make(chan *line.Line, 2), + serverMessages: make(chan string, 2), + maprMessages: make(chan string, 2), + hostname: "testhost", + activeGeneration: func() uint64 { + return activeGeneration + }, + } +} + +func readHandlerOutput(t *testing.T, handler *baseHandler) string { + t.Helper() + + buf := make([]byte, 256) + n, err := handler.Read(buf) + if err != nil { + t.Fatalf("Read() error = %v", err) + } + return string(buf[:n]) +} diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 493f4b7..9c85889 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -12,6 +12,7 @@ import ( "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" @@ -21,6 +22,7 @@ import ( type readCommand struct { server readCommandServer mode omode.Mode + generation uint64 shutdownCoordinator *shutdownCoordinator } @@ -42,19 +44,20 @@ func newReadCommand(server readCommandServer, mode omode.Mode) *readCommand { func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, argc int, args []string, retries int) { + r.generation = sessionGenerationFromContext(ctx) re := regex.NewNoop() if argc >= 4 { deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " ")) if err != nil { - r.server.SendServerMessage(dlog.Server.Error(r.server.LogContext(), + r.sendServerMessage(dlog.Server.Error(r.server.LogContext(), "Unable to parse command", err)) return } re = deserializedRegex } if argc < 3 { - r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(), + r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(), "Unable to parse command", args, argc)) return } @@ -91,7 +94,7 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, if numPaths := len(paths); numPaths == 0 { dlog.Server.Error(r.server.LogContext(), "No such file(s) to read", glob) - r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(), + r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(), "Unable to read file(s), check server logs")) select { case <-ctx.Done(): @@ -106,7 +109,7 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, return } - r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(), + r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(), "Giving up to read file(s)")) return } @@ -186,7 +189,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC globID := r.makeGlobID(path, glob) if !r.server.CanReadFile(path) { dlog.Server.Error(r.server.LogContext(), "No permission to read file", path, globID) - r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(), + r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(), "Unable to read file(s), check server logs")) return } @@ -201,16 +204,18 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, var reader fs.FileReader var limiter chan struct{} + serverMessages, closeServerMessages := r.newGeneratedServerMessagesChannel(ctx) + defer closeServerMessages() switch r.mode { case omode.GrepClient, omode.CatClient: - catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength()) + catFile := fs.NewCatFile(path, globID, serverMessages, r.server.MaxLineLength()) reader = &catFile limiter = r.server.CatLimiter() case omode.TailClient: fallthrough default: - tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength()) + tailFile := fs.NewTailFile(path, globID, serverMessages, r.server.MaxLineLength()) reader = &tailFile limiter = r.server.TailLimiter() } @@ -316,8 +321,9 @@ func (r *readCommand) readViaChannels() readStrategy { r.server.RegisterAggregateLines(linesCh) closeLines = true } else { - // For non-MapReduce operations, use the server's shared lines channel. - linesCh = r.server.SharedLinesChannel() + // For non-MapReduce operations, forward lines through a generation-aware channel. + linesCh = r.newGeneratedLinesChannel(ctx) + closeLines = true } err := reader.Start(ctx, ltx, linesCh, re) @@ -373,21 +379,23 @@ func (r *readCommand) ensureTurboModeEnabled() { } r.server.EnableTurboMode() // Wake a potentially blocked reader goroutine so it can switch to turbo drain path. - r.server.SendServerMessage(".turbo wake") + r.sendServerMessage(".turbo wake") } func (r *readCommand) makeTurboWriter() TurboWriter { // Create a writer instance per file to keep concurrent processing isolated. if r.server.Serverless() { - return NewDirectTurboWriter(os.Stdout, r.server.Hostname(), r.server.PlainOutput(), r.server.Serverless()) + return NewGeneratedDirectTurboWriter(os.Stdout, r.server.Hostname(), r.server.PlainOutput(), r.server.Serverless(), r.generation, r.server.ActiveSessionGeneration) } return &TurboNetworkWriter{ - turboLines: r.server.GetTurboChannel(), - serverMessages: r.server.ServerMessagesChannel(), - hostname: r.server.Hostname(), - plain: r.server.PlainOutput(), - serverless: r.server.Serverless(), + turboLines: r.server.GetTurboChannel(), + serverMessages: r.server.ServerMessagesChannel(), + hostname: r.server.Hostname(), + plain: r.server.PlainOutput(), + serverless: r.server.Serverless(), + generation: r.generation, + activeGeneration: r.server.ActiveSessionGeneration, } } @@ -428,10 +436,68 @@ func (r *readCommand) makeGlobID(path, glob string) string { return pathParts[len(pathParts)-1] } - r.server.SendServerMessage(dlog.Server.Warn("Empty file path given?", path, glob)) + r.sendServerMessage(dlog.Server.Warn("Empty file path given?", path, glob)) return "" } +func (r *readCommand) sendServerMessage(message string) { + r.server.ServerMessagesChannel() <- encodeGeneratedMessage(r.generation, message+"\n") +} + +func (r *readCommand) newGeneratedServerMessagesChannel(ctx context.Context) (chan string, func()) { + serverMessages := make(chan string, 16) + go func() { + for { + select { + case message, ok := <-serverMessages: + if !ok { + return + } + select { + case r.server.ServerMessagesChannel() <- encodeGeneratedMessage(r.generation, message): + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + return serverMessages, func() { + close(serverMessages) + } +} + +func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) chan *line.Line { + linesCh := make(chan *line.Line, r.server.AggregateLinesChannelBufferSize()) + go func() { + for { + select { + case generatedLine, ok := <-linesCh: + if !ok { + return + } + if generatedLine == nil { + continue + } + generatedLine.Generation = r.generation + select { + case r.server.SharedLinesChannel() <- generatedLine: + case <-ctx.Done(): + if generatedLine.Content != nil { + pool.RecycleBytesBuffer(generatedLine.Content) + } + generatedLine.Recycle() + return + } + case <-ctx.Done(): + return + } + } + }() + return linesCh +} + func (r *readCommand) isInputFromPipe() bool { if !r.server.Serverless() { // Can read from pipe only in serverless mode. diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go index d073682..8c3cb96 100644 --- a/internal/server/handlers/readcommand_server.go +++ b/internal/server/handlers/readcommand_server.go @@ -37,6 +37,7 @@ type readCommandLifecycle interface { AddPendingFiles(delta int32) int32 CompletePendingFile() (remaining int32, activeCommands int32) PendingAndActive() (pending int32, activeCommands int32) + ActiveSessionGeneration() uint64 TriggerShutdown() } @@ -167,6 +168,11 @@ func (h *ServerHandler) PendingAndActive() (pending int32, activeCommands int32) return pending, activeCommands } +// ActiveSessionGeneration returns the currently active interactive session generation. +func (h *ServerHandler) ActiveSessionGeneration() uint64 { + return h.sessionState.currentGeneration() +} + // TriggerShutdown starts the handler shutdown sequence. func (h *ServerHandler) TriggerShutdown() { h.shutdown() diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 79d03b8..ef64468 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -71,6 +71,7 @@ func NewServerHandler(user *user.User, catLimiter, h.handleCommandCb = h.handleUserCommand h.commands = h.newCommandRegistry() h.turbo.configure(h.turboManagerConfig()) + h.baseHandler.activeGeneration = h.sessionState.currentGeneration fqdn, err := config.Hostname() if err != nil { @@ -160,8 +161,10 @@ func (h *ServerHandler) handleMapCommand(ctx context.Context, _ lcontext.LContex h.aggregate = aggregate h.turboAggregate = turboAggregate + maprMessages, closeMaprMessages := h.newGeneratedMaprMessagesChannel(ctx, sessionGenerationFromContext(ctx)) go func() { - command.Start(ctx, h.maprMessages) + defer closeMaprMessages() + command.Start(ctx, maprMessages) commandFinished() }() } @@ -205,3 +208,25 @@ func (h *ServerHandler) handleAuthKeyCommand(_ context.Context, _ lcontext.LCont h.authKeyStore.Add(h.user.Name, pubKey) h.sendln(h.serverMessages, "AUTHKEY OK") } + +func (h *ServerHandler) newGeneratedMaprMessagesChannel(ctx context.Context, generation uint64) (chan string, func()) { + maprMessages := make(chan string, 16) + go func() { + for { + select { + case message, ok := <-maprMessages: + if !ok { + return + } + h.send(h.maprMessages, encodeGeneratedMessage(generation, message)) + case <-ctx.Done(): + return + case <-h.done.Done(): + return + } + } + }() + return maprMessages, func() { + close(maprMessages) + } +} diff --git a/internal/server/handlers/sessioncommand.go b/internal/server/handlers/sessioncommand.go index 0d54963..25b8d15 100644 --- a/internal/server/handlers/sessioncommand.go +++ b/internal/server/handlers/sessioncommand.go @@ -140,6 +140,7 @@ func (s *sessionCommandState) start(handler *ServerHandler, spec session.Spec) ( s.spec = spec s.cancel = cancel s.mu.Unlock() + ctx = withSessionGeneration(ctx, 1) handler.resetSessionAggregates() if err := handler.dispatchSessionCommands(ctx, commands); err != nil { @@ -172,6 +173,7 @@ func (s *sessionCommandState) update(handler *ServerHandler, spec session.Spec, s.spec = spec s.cancel = cancel s.mu.Unlock() + ctx = withSessionGeneration(ctx, generation) if oldCancel != nil { oldCancel() @@ -220,6 +222,12 @@ func (s *sessionCommandState) keepAlive() bool { return s.active } +func (s *sessionCommandState) currentGeneration() uint64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.generation +} + func (s *sessionCommandState) reset() { s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go index deed383..4b4a883 100644 --- a/internal/server/handlers/turbo_manager.go +++ b/internal/server/handlers/turbo_manager.go @@ -195,7 +195,7 @@ func (t *turboManager) flush(user *user.User) { // tryRead tries to serve data from turbo state and channels. // Returns handled=false when caller should continue with normal path. -func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) { +func (t *turboManager) tryRead(p []byte, user *user.User, shouldDropGeneration func(uint64) bool) (n int, handled bool) { if !t.mode { return 0, false } @@ -215,57 +215,69 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) channelLen := len(t.lines) dlog.Server.Trace(user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen) - select { - case turboData := <-t.lines: - dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData)) - t.eofEmptySince = time.Time{} - n = copy(p, turboData) - if n < len(turboData) { - t.buffer = turboData[n:] - dlog.Server.Trace(user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(t.buffer)) - } - return n, true - default: - if channelLen > 0 { - dlog.Server.Trace(user, "baseHandler.Read", "channel has data but not available, waiting") - time.Sleep(t.resolvedReadRetryInterval()) - select { - case turboData := <-t.lines: - dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData)) + for { + select { + case turboData := <-t.lines: + generation, decodedData := decodeGeneratedBytes(turboData) + if shouldDropGeneration != nil && shouldDropGeneration(generation) { t.eofEmptySince = time.Time{} - n = copy(p, turboData) - if n < len(turboData) { - t.buffer = turboData[n:] - } - return n, true - default: - // Still no data. + continue } - } - - if t.eof != nil { - select { - case <-t.eof: - if len(t.lines) > 0 { + dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(decodedData)) + t.eofEmptySince = time.Time{} + n = copy(p, decodedData) + if n < len(decodedData) { + t.buffer = decodedData[n:] + dlog.Server.Trace(user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(t.buffer)) + } + return n, true + default: + if channelLen > 0 { + dlog.Server.Trace(user, "baseHandler.Read", "channel has data but not available, waiting") + time.Sleep(t.resolvedReadRetryInterval()) + select { + case turboData := <-t.lines: + generation, decodedData := decodeGeneratedBytes(turboData) + if shouldDropGeneration != nil && shouldDropGeneration(generation) { + t.eofEmptySince = time.Time{} + continue + } + dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(decodedData)) t.eofEmptySince = time.Time{} - break - } - - if t.eofEmptySince.IsZero() { - t.eofEmptySince = time.Now() - break + n = copy(p, decodedData) + if n < len(decodedData) { + t.buffer = decodedData[n:] + } + return n, true + default: + // Still no data. } + } - if time.Since(t.eofEmptySince) >= t.resolvedEOFAckQuietPeriod() { - dlog.Server.Trace(user, "baseHandler.Read", "EOF acknowledged and channel stable-empty, disabling turbo mode") - t.mode = false - t.signalEOFAck() + if t.eof != nil { + select { + case <-t.eof: + if len(t.lines) > 0 { + t.eofEmptySince = time.Time{} + break + } + + if t.eofEmptySince.IsZero() { + t.eofEmptySince = time.Now() + break + } + + if time.Since(t.eofEmptySince) >= t.resolvedEOFAckQuietPeriod() { + dlog.Server.Trace(user, "baseHandler.Read", "EOF acknowledged and channel stable-empty, disabling turbo mode") + t.mode = false + t.signalEOFAck() + } + default: } - default: } - } - dlog.Server.Trace(user, "baseHandler.Read", "no data in turboLines, falling through") - return 0, false + dlog.Server.Trace(user, "baseHandler.Read", "no data in turboLines, falling through") + return 0, false + } } } diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go index af124e9..fa12f72 100644 --- a/internal/server/handlers/turbo_writer.go +++ b/internal/server/handlers/turbo_writer.go @@ -30,6 +30,7 @@ type DirectTurboWriter struct { hostname string plain bool serverless bool + generation uint64 // Buffering for efficiency writeBuf bytes.Buffer @@ -39,6 +40,8 @@ type DirectTurboWriter struct { // Stats linesWritten uint64 bytesWritten uint64 + + activeGeneration func() uint64 } var _ TurboWriter = (*DirectTurboWriter)(nil) @@ -54,9 +57,19 @@ func NewDirectTurboWriter(writer io.Writer, hostname string, plain, serverless b } } +func NewGeneratedDirectTurboWriter(writer io.Writer, hostname string, plain, serverless bool, generation uint64, activeGeneration func() uint64) *DirectTurboWriter { + w := NewDirectTurboWriter(writer, hostname, plain, serverless) + w.generation = generation + w.activeGeneration = activeGeneration + return w +} + // WriteLineData writes formatted line data directly to output. // Dispatches to serverless or network mode handlers based on configuration. func (w *DirectTurboWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error { + if !shouldWriteGeneration(w.generation, w.activeGeneration) { + return nil + } w.mutex.Lock() defer w.mutex.Unlock() @@ -135,6 +148,9 @@ func (w *DirectTurboWriter) writeNetworkLine(lineContent []byte, lineNum uint64, // WriteServerMessage writes a server message func (w *DirectTurboWriter) WriteServerMessage(message string) error { + if !shouldWriteGeneration(w.generation, w.activeGeneration) { + return nil + } if w.serverless { return nil } @@ -209,6 +225,7 @@ type TurboChannelWriter struct { hostname string plain bool serverless bool + generation uint64 // Buffering for efficiency writeBuf bytes.Buffer @@ -218,6 +235,8 @@ type TurboChannelWriter struct { // Stats linesWritten uint64 bytesWritten uint64 + + activeGeneration func() uint64 } var _ TurboWriter = (*TurboChannelWriter)(nil) @@ -233,8 +252,18 @@ func NewTurboChannelWriter(channel chan<- []byte, hostname string, plain, server } } +func NewGeneratedTurboChannelWriter(channel chan<- []byte, hostname string, plain, serverless bool, generation uint64, activeGeneration func() uint64) *TurboChannelWriter { + w := NewTurboChannelWriter(channel, hostname, plain, serverless) + w.generation = generation + w.activeGeneration = activeGeneration + return w +} + // WriteLineData formats and writes line data to the turbo channel func (w *TurboChannelWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error { + if !shouldWriteGeneration(w.generation, w.activeGeneration) { + return nil + } w.mutex.Lock() defer w.mutex.Unlock() @@ -255,7 +284,7 @@ func (w *TurboChannelWriter) WriteLineData(lineContent []byte, lineNum uint64, s w.writeBuf.Reset() select { - case w.channel <- data: + case w.channel <- encodeGeneratedBytes(w.generation, data): return nil default: return fmt.Errorf("turbo channel full") @@ -264,6 +293,9 @@ func (w *TurboChannelWriter) WriteLineData(lineContent []byte, lineNum uint64, s // WriteServerMessage writes a server message func (w *TurboChannelWriter) WriteServerMessage(message string) error { + if !shouldWriteGeneration(w.generation, w.activeGeneration) { + return nil + } if w.serverless { return nil } @@ -288,7 +320,7 @@ func (w *TurboChannelWriter) WriteServerMessage(message string) error { data := buf.Bytes() select { - case w.channel <- data: + case w.channel <- encodeGeneratedBytes(w.generation, data): return nil default: return fmt.Errorf("turbo channel full") @@ -315,6 +347,7 @@ type TurboNetworkWriter struct { hostname string plain bool serverless bool + generation uint64 // Internal buffer for batching writes writeBuf bytes.Buffer @@ -324,11 +357,16 @@ type TurboNetworkWriter struct { // Stats linesWritten uint64 bytesWritten uint64 + + activeGeneration func() uint64 } // WriteLineData formats and writes line data directly to the turbo channel. // Builds the protocol-formatted line and sends it via sendToTurboChannel. func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error { + if !shouldWriteGeneration(w.generation, w.activeGeneration) { + return nil + } w.mutex.Lock() defer w.mutex.Unlock() @@ -366,7 +404,7 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error { // Send data to turbo channel, retry once if full select { - case w.turboLines <- data: + case w.turboLines <- encodeGeneratedBytes(w.generation, data): dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully") w.writeBuf.Reset() return nil @@ -374,7 +412,7 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error { // Channel full, wait a bit and retry dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry") time.Sleep(time.Millisecond) - w.turboLines <- data + w.turboLines <- encodeGeneratedBytes(w.generation, data) dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel after retry") w.writeBuf.Reset() return nil @@ -383,11 +421,14 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error { // WriteServerMessage writes a server message func (w *TurboNetworkWriter) WriteServerMessage(message string) error { + if !shouldWriteGeneration(w.generation, w.activeGeneration) { + return nil + } // Server messages are less critical in turbo mode // We can send them through the normal channel if w.serverMessages != nil { select { - case w.serverMessages <- message: + case w.serverMessages <- encodeGeneratedMessage(w.generation, message): return nil default: return fmt.Errorf("server message channel full") @@ -412,7 +453,7 @@ func (w *TurboNetworkWriter) Flush() error { copy(data, w.writeBuf.Bytes()) // Force send the data - w.turboLines <- data + w.turboLines <- encodeGeneratedBytes(w.generation, data) w.writeBuf.Reset() dlog.Server.Trace("TurboNetworkWriter.Flush", "flushed data to channel") } @@ -437,6 +478,19 @@ func (w *TurboNetworkWriter) Flush() error { return nil } +func shouldWriteGeneration(generation uint64, activeGeneration func() uint64) bool { + if generation == 0 || activeGeneration == nil { + return true + } + + currentGeneration := activeGeneration() + if currentGeneration == 0 { + return true + } + + return currentGeneration == generation +} + // DirectLineProcessor processes lines directly without channels in turbo mode type DirectLineProcessor struct { writer TurboWriter -- cgit v1.2.3