summaryrefslogtreecommitdiff
path: root/internal/server/handlers
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 09:13:51 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 09:13:51 +0200
commit7a79d0a8bf58b05dfbae331d00275739530b9584 (patch)
tree156a7c91984f11cb334a589649f337e8fa7c434d /internal/server/handlers
parent9f6850fc202e048dcdbfa6ffb59589d4a851cd84 (diff)
task 682e6ae9: filter stale generation output
Diffstat (limited to 'internal/server/handlers')
-rw-r--r--internal/server/handlers/basehandler.go166
-rw-r--r--internal/server/handlers/generation_output.go75
-rw-r--r--internal/server/handlers/generation_output_test.go115
-rw-r--r--internal/server/handlers/readcommand.go100
-rw-r--r--internal/server/handlers/readcommand_server.go6
-rw-r--r--internal/server/handlers/serverhandler.go27
-rw-r--r--internal/server/handlers/sessioncommand.go8
-rw-r--r--internal/server/handlers/turbo_manager.go102
-rw-r--r--internal/server/handlers/turbo_writer.go66
9 files changed, 533 insertions, 132 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 42cc4cc..66c2cb7 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -47,6 +47,8 @@ type baseHandler struct {
serverless bool
turbo turboManager
+
+ activeGeneration func() uint64
}
// Shutdown the handler.
@@ -73,83 +75,108 @@ func (h *baseHandler) Done() <-chan struct{} {
func (h *baseHandler) Read(p []byte) (n int, err error) {
defer h.readBuf.Reset()
- if n, handled := h.turbo.tryRead(p, h.user); handled {
- return n, nil
- }
-
- pollInterval := time.Second
- if h.turbo.enabled() {
- // Turbo reads require tighter wake-ups so we can continue draining the turbo channel.
- pollInterval = h.turbo.resolvedReadRetryInterval()
- }
- poll := time.After(pollInterval)
-
- select {
- case message := <-h.serverMessages:
- if len(message) > 0 && message[0] == '.' {
- // Handle hidden message (don't display to the user)
- h.readBuf.WriteString(message)
- h.readBuf.WriteByte(protocol.MessageDelimiter)
- n = copy(p, h.readBuf.Bytes())
- return
+ for {
+ if n, handled := h.turbo.tryRead(p, h.user, h.shouldDropGeneration); handled {
+ if n == 0 {
+ continue
+ }
+ return n, nil
}
- if h.serverless {
- return
+ pollInterval := time.Second
+ if h.turbo.enabled() {
+ // Turbo reads require tighter wake-ups so we can continue draining the turbo channel.
+ pollInterval = h.turbo.resolvedReadRetryInterval()
}
+ poll := time.After(pollInterval)
- // Skip empty server messages when in plain mode
- if h.plain && (message == "" || message == "\n") {
+ select {
+ case message := <-h.serverMessages:
+ generation, decodedMessage := decodeGeneratedMessage(message)
+ if h.shouldDropGeneration(generation) {
+ continue
+ }
+ message = decodedMessage
+ if len(message) > 0 && message[0] == '.' {
+ // Handle hidden message (don't display to the user)
+ h.readBuf.WriteString(message)
+ h.readBuf.WriteByte(protocol.MessageDelimiter)
+ n = copy(p, h.readBuf.Bytes())
+ return
+ }
+
+ if h.serverless {
+ return
+ }
+
+ // Skip empty server messages when in plain mode
+ if h.plain && (message == "" || message == "\n") {
+ return
+ }
+
+ // Handle normal server message (display to the user).
+ formatServerMessage(&h.readBuf, h.hostname, message, h.plain)
+ n = copy(p, h.readBuf.Bytes())
return
- }
- // Handle normal server message (display to the user).
- formatServerMessage(&h.readBuf, h.hostname, message, h.plain)
- n = copy(p, h.readBuf.Bytes())
-
- case message := <-h.maprMessages:
- // Send mapreduce-aggregated data as a message.
- h.readBuf.WriteString("AGGREGATE")
- h.readBuf.WriteString(protocol.FieldDelimiter)
- h.readBuf.WriteString(h.hostname)
- h.readBuf.WriteString(protocol.FieldDelimiter)
- h.readBuf.WriteString(message)
- h.readBuf.WriteByte(protocol.MessageDelimiter)
- n = copy(p, h.readBuf.Bytes())
-
- case line := <-h.lines:
- if h.plain {
- h.readBuf.Write(line.Content.Bytes())
+ case message := <-h.maprMessages:
+ generation, decodedMessage := decodeGeneratedMessage(message)
+ if h.shouldDropGeneration(generation) {
+ continue
+ }
+ message = decodedMessage
+ // Send mapreduce-aggregated data as a message.
+ h.readBuf.WriteString("AGGREGATE")
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(h.hostname)
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(message)
h.readBuf.WriteByte(protocol.MessageDelimiter)
- } else {
- formatRemoteLine(
- &h.readBuf,
- h.hostname,
- fmt.Sprintf("%3d", line.TransmittedPerc),
- line.Count,
- line.SourceID,
- line.Content.Bytes(),
- )
- }
- n = copy(p, h.readBuf.Bytes())
- pool.RecycleBytesBuffer(line.Content)
- line.Recycle()
+ n = copy(p, h.readBuf.Bytes())
+ return
- case <-h.done.Done():
- err = io.EOF
- return
+ case line := <-h.lines:
+ if line == nil {
+ continue
+ }
+ if h.shouldDropGeneration(line.Generation) {
+ pool.RecycleBytesBuffer(line.Content)
+ line.Recycle()
+ continue
+ }
+ if h.plain {
+ h.readBuf.Write(line.Content.Bytes())
+ h.readBuf.WriteByte(protocol.MessageDelimiter)
+ } else {
+ formatRemoteLine(
+ &h.readBuf,
+ h.hostname,
+ fmt.Sprintf("%3d", line.TransmittedPerc),
+ line.Count,
+ line.SourceID,
+ line.Content.Bytes(),
+ )
+ }
+ n = copy(p, h.readBuf.Bytes())
+ pool.RecycleBytesBuffer(line.Content)
+ line.Recycle()
+ return
- case <-poll:
- // Wake periodically so turbo mode transitions don't leave this read blocked forever.
- select {
case <-h.done.Done():
err = io.EOF
return
- default:
+
+ case <-poll:
+ // Wake periodically so turbo mode transitions don't leave this read blocked forever.
+ select {
+ case <-h.done.Done():
+ err = io.EOF
+ return
+ default:
+ }
+ return
}
- return
}
- return
}
// Write is to receive data from the dtail client via Writer interface.
@@ -288,6 +315,19 @@ func (h *baseHandler) sendln(ch chan<- string, message string) {
h.send(ch, message+"\n")
}
+func (h *baseHandler) shouldDropGeneration(generation uint64) bool {
+ if generation == 0 || h.activeGeneration == nil {
+ return false
+ }
+
+ activeGeneration := h.activeGeneration()
+ if activeGeneration == 0 {
+ return false
+ }
+
+ return activeGeneration != generation
+}
+
func (h *baseHandler) flush() {
dlog.Server.Trace(h.user, "flush()")
numUnsentMessages := func() int {
diff --git a/internal/server/handlers/generation_output.go b/internal/server/handlers/generation_output.go
new file mode 100644
index 0000000..aa9d195
--- /dev/null
+++ b/internal/server/handlers/generation_output.go
@@ -0,0 +1,75 @@
+package handlers
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "strings"
+)
+
+type sessionGenerationKey struct{}
+
+const generationOutputPrefix = "\x1egen:"
+
+func withSessionGeneration(ctx context.Context, generation uint64) context.Context {
+ if ctx == nil || generation == 0 {
+ return ctx
+ }
+ return context.WithValue(ctx, sessionGenerationKey{}, generation)
+}
+
+func sessionGenerationFromContext(ctx context.Context) uint64 {
+ if ctx == nil {
+ return 0
+ }
+
+ generation, _ := ctx.Value(sessionGenerationKey{}).(uint64)
+ return generation
+}
+
+func encodeGeneratedMessage(generation uint64, message string) string {
+ if generation == 0 {
+ return message
+ }
+ return fmt.Sprintf("%s%d\x1e%s", generationOutputPrefix, generation, message)
+}
+
+func decodeGeneratedMessage(message string) (uint64, string) {
+ if !strings.HasPrefix(message, generationOutputPrefix) {
+ return 0, message
+ }
+
+ rest := strings.TrimPrefix(message, generationOutputPrefix)
+ parts := strings.SplitN(rest, "\x1e", 2)
+ if len(parts) != 2 {
+ return 0, message
+ }
+
+ generation, err := strconv.ParseUint(parts[0], 10, 64)
+ if err != nil {
+ return 0, message
+ }
+
+ return generation, parts[1]
+}
+
+func encodeGeneratedBytes(generation uint64, payload []byte) []byte {
+ if generation == 0 {
+ return payload
+ }
+
+ prefix := []byte(fmt.Sprintf("%s%d\x1e", generationOutputPrefix, generation))
+ data := make([]byte, 0, len(prefix)+len(payload))
+ data = append(data, prefix...)
+ data = append(data, payload...)
+ return data
+}
+
+func decodeGeneratedBytes(payload []byte) (uint64, []byte) {
+ message := string(payload)
+ generation, decoded := decodeGeneratedMessage(message)
+ if generation == 0 {
+ return 0, payload
+ }
+ return generation, []byte(decoded)
+}
diff --git a/internal/server/handlers/generation_output_test.go b/internal/server/handlers/generation_output_test.go
new file mode 100644
index 0000000..6020c09
--- /dev/null
+++ b/internal/server/handlers/generation_output_test.go
@@ -0,0 +1,115 @@
+package handlers
+
+import (
+ "bytes"
+ "strings"
+ "testing"
+
+ "github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/io/line"
+ userserver "github.com/mimecast/dtail/internal/user/server"
+)
+
+func TestDecodeGeneratedMessage(t *testing.T) {
+ generation, message := decodeGeneratedMessage(encodeGeneratedMessage(7, "hello"))
+ if generation != 7 {
+ t.Fatalf("unexpected generation: %d", generation)
+ }
+ if message != "hello" {
+ t.Fatalf("unexpected message: %q", message)
+ }
+}
+
+func TestBaseHandlerReadDropsStaleServerMessage(t *testing.T) {
+ handler := newGenerationTestHandler(2)
+ handler.serverMessages <- encodeGeneratedMessage(1, "stale\n")
+ handler.serverMessages <- encodeGeneratedMessage(2, "fresh\n")
+
+ got := readHandlerOutput(t, &handler)
+ if strings.Contains(got, "stale") {
+ t.Fatalf("unexpected stale output: %q", got)
+ }
+ if !strings.Contains(got, "fresh") {
+ t.Fatalf("expected current output, got %q", got)
+ }
+}
+
+func TestBaseHandlerReadDropsStaleMaprMessage(t *testing.T) {
+ handler := newGenerationTestHandler(3)
+ handler.maprMessages <- encodeGeneratedMessage(2, "old aggregate")
+ handler.maprMessages <- encodeGeneratedMessage(3, "new aggregate")
+
+ got := readHandlerOutput(t, &handler)
+ if strings.Contains(got, "old aggregate") {
+ t.Fatalf("unexpected stale aggregate output: %q", got)
+ }
+ if !strings.Contains(got, "new aggregate") {
+ t.Fatalf("expected current aggregate output, got %q", got)
+ }
+}
+
+func TestBaseHandlerReadDropsStaleLine(t *testing.T) {
+ handler := newGenerationTestHandler(4)
+
+ staleLine := line.New(bytes.NewBufferString("stale line"), 1, 100, "app.log")
+ staleLine.Generation = 3
+ currentLine := line.New(bytes.NewBufferString("fresh line"), 2, 100, "app.log")
+ currentLine.Generation = 4
+
+ handler.lines <- staleLine
+ handler.lines <- currentLine
+
+ got := readHandlerOutput(t, &handler)
+ if strings.Contains(got, "stale line") {
+ t.Fatalf("unexpected stale line output: %q", got)
+ }
+ if !strings.Contains(got, "fresh line") {
+ t.Fatalf("expected current line output, got %q", got)
+ }
+}
+
+func TestTurboManagerTryReadDropsStaleGeneration(t *testing.T) {
+ resetServerLogger(t)
+
+ manager := turboManager{
+ mode: true,
+ lines: make(chan []byte, 2),
+ }
+ manager.lines <- encodeGeneratedBytes(1, []byte("stale"))
+ manager.lines <- encodeGeneratedBytes(2, []byte("fresh"))
+
+ buf := make([]byte, 32)
+ n, handled := manager.tryRead(buf, &userserver.User{Name: "turbo-test"}, func(generation uint64) bool {
+ return generation != 0 && generation != 2
+ })
+ if !handled {
+ t.Fatalf("expected turbo read to be handled")
+ }
+ if got := string(buf[:n]); got != "fresh" {
+ t.Fatalf("unexpected turbo output: %q", got)
+ }
+}
+
+func newGenerationTestHandler(activeGeneration uint64) baseHandler {
+ return baseHandler{
+ done: internal.NewDone(),
+ lines: make(chan *line.Line, 2),
+ serverMessages: make(chan string, 2),
+ maprMessages: make(chan string, 2),
+ hostname: "testhost",
+ activeGeneration: func() uint64 {
+ return activeGeneration
+ },
+ }
+}
+
+func readHandlerOutput(t *testing.T, handler *baseHandler) string {
+ t.Helper()
+
+ buf := make([]byte, 256)
+ n, err := handler.Read(buf)
+ if err != nil {
+ t.Fatalf("Read() error = %v", err)
+ }
+ return string(buf[:n])
+}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 493f4b7..9c85889 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -12,6 +12,7 @@ import (
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
@@ -21,6 +22,7 @@ import (
type readCommand struct {
server readCommandServer
mode omode.Mode
+ generation uint64
shutdownCoordinator *shutdownCoordinator
}
@@ -42,19 +44,20 @@ func newReadCommand(server readCommandServer, mode omode.Mode) *readCommand {
func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
argc int, args []string, retries int) {
+ r.generation = sessionGenerationFromContext(ctx)
re := regex.NewNoop()
if argc >= 4 {
deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " "))
if err != nil {
- r.server.SendServerMessage(dlog.Server.Error(r.server.LogContext(),
+ r.sendServerMessage(dlog.Server.Error(r.server.LogContext(),
"Unable to parse command", err))
return
}
re = deserializedRegex
}
if argc < 3 {
- r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(),
+ r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(),
"Unable to parse command", args, argc))
return
}
@@ -91,7 +94,7 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
if numPaths := len(paths); numPaths == 0 {
dlog.Server.Error(r.server.LogContext(), "No such file(s) to read", glob)
- r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(),
+ r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(),
"Unable to read file(s), check server logs"))
select {
case <-ctx.Done():
@@ -106,7 +109,7 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
return
}
- r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(),
+ r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(),
"Giving up to read file(s)"))
return
}
@@ -186,7 +189,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
globID := r.makeGlobID(path, glob)
if !r.server.CanReadFile(path) {
dlog.Server.Error(r.server.LogContext(), "No permission to read file", path, globID)
- r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(),
+ r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(),
"Unable to read file(s), check server logs"))
return
}
@@ -201,16 +204,18 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
var reader fs.FileReader
var limiter chan struct{}
+ serverMessages, closeServerMessages := r.newGeneratedServerMessagesChannel(ctx)
+ defer closeServerMessages()
switch r.mode {
case omode.GrepClient, omode.CatClient:
- catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength())
+ catFile := fs.NewCatFile(path, globID, serverMessages, r.server.MaxLineLength())
reader = &catFile
limiter = r.server.CatLimiter()
case omode.TailClient:
fallthrough
default:
- tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength())
+ tailFile := fs.NewTailFile(path, globID, serverMessages, r.server.MaxLineLength())
reader = &tailFile
limiter = r.server.TailLimiter()
}
@@ -316,8 +321,9 @@ func (r *readCommand) readViaChannels() readStrategy {
r.server.RegisterAggregateLines(linesCh)
closeLines = true
} else {
- // For non-MapReduce operations, use the server's shared lines channel.
- linesCh = r.server.SharedLinesChannel()
+ // For non-MapReduce operations, forward lines through a generation-aware channel.
+ linesCh = r.newGeneratedLinesChannel(ctx)
+ closeLines = true
}
err := reader.Start(ctx, ltx, linesCh, re)
@@ -373,21 +379,23 @@ func (r *readCommand) ensureTurboModeEnabled() {
}
r.server.EnableTurboMode()
// Wake a potentially blocked reader goroutine so it can switch to turbo drain path.
- r.server.SendServerMessage(".turbo wake")
+ r.sendServerMessage(".turbo wake")
}
func (r *readCommand) makeTurboWriter() TurboWriter {
// Create a writer instance per file to keep concurrent processing isolated.
if r.server.Serverless() {
- return NewDirectTurboWriter(os.Stdout, r.server.Hostname(), r.server.PlainOutput(), r.server.Serverless())
+ return NewGeneratedDirectTurboWriter(os.Stdout, r.server.Hostname(), r.server.PlainOutput(), r.server.Serverless(), r.generation, r.server.ActiveSessionGeneration)
}
return &TurboNetworkWriter{
- turboLines: r.server.GetTurboChannel(),
- serverMessages: r.server.ServerMessagesChannel(),
- hostname: r.server.Hostname(),
- plain: r.server.PlainOutput(),
- serverless: r.server.Serverless(),
+ turboLines: r.server.GetTurboChannel(),
+ serverMessages: r.server.ServerMessagesChannel(),
+ hostname: r.server.Hostname(),
+ plain: r.server.PlainOutput(),
+ serverless: r.server.Serverless(),
+ generation: r.generation,
+ activeGeneration: r.server.ActiveSessionGeneration,
}
}
@@ -428,10 +436,68 @@ func (r *readCommand) makeGlobID(path, glob string) string {
return pathParts[len(pathParts)-1]
}
- r.server.SendServerMessage(dlog.Server.Warn("Empty file path given?", path, glob))
+ r.sendServerMessage(dlog.Server.Warn("Empty file path given?", path, glob))
return ""
}
+func (r *readCommand) sendServerMessage(message string) {
+ r.server.ServerMessagesChannel() <- encodeGeneratedMessage(r.generation, message+"\n")
+}
+
+func (r *readCommand) newGeneratedServerMessagesChannel(ctx context.Context) (chan string, func()) {
+ serverMessages := make(chan string, 16)
+ go func() {
+ for {
+ select {
+ case message, ok := <-serverMessages:
+ if !ok {
+ return
+ }
+ select {
+ case r.server.ServerMessagesChannel() <- encodeGeneratedMessage(r.generation, message):
+ case <-ctx.Done():
+ return
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+ return serverMessages, func() {
+ close(serverMessages)
+ }
+}
+
+func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) chan *line.Line {
+ linesCh := make(chan *line.Line, r.server.AggregateLinesChannelBufferSize())
+ go func() {
+ for {
+ select {
+ case generatedLine, ok := <-linesCh:
+ if !ok {
+ return
+ }
+ if generatedLine == nil {
+ continue
+ }
+ generatedLine.Generation = r.generation
+ select {
+ case r.server.SharedLinesChannel() <- generatedLine:
+ case <-ctx.Done():
+ if generatedLine.Content != nil {
+ pool.RecycleBytesBuffer(generatedLine.Content)
+ }
+ generatedLine.Recycle()
+ return
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+ return linesCh
+}
+
func (r *readCommand) isInputFromPipe() bool {
if !r.server.Serverless() {
// Can read from pipe only in serverless mode.
diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go
index d073682..8c3cb96 100644
--- a/internal/server/handlers/readcommand_server.go
+++ b/internal/server/handlers/readcommand_server.go
@@ -37,6 +37,7 @@ type readCommandLifecycle interface {
AddPendingFiles(delta int32) int32
CompletePendingFile() (remaining int32, activeCommands int32)
PendingAndActive() (pending int32, activeCommands int32)
+ ActiveSessionGeneration() uint64
TriggerShutdown()
}
@@ -167,6 +168,11 @@ func (h *ServerHandler) PendingAndActive() (pending int32, activeCommands int32)
return pending, activeCommands
}
+// ActiveSessionGeneration returns the currently active interactive session generation.
+func (h *ServerHandler) ActiveSessionGeneration() uint64 {
+ return h.sessionState.currentGeneration()
+}
+
// TriggerShutdown starts the handler shutdown sequence.
func (h *ServerHandler) TriggerShutdown() {
h.shutdown()
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 79d03b8..ef64468 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -71,6 +71,7 @@ func NewServerHandler(user *user.User, catLimiter,
h.handleCommandCb = h.handleUserCommand
h.commands = h.newCommandRegistry()
h.turbo.configure(h.turboManagerConfig())
+ h.baseHandler.activeGeneration = h.sessionState.currentGeneration
fqdn, err := config.Hostname()
if err != nil {
@@ -160,8 +161,10 @@ func (h *ServerHandler) handleMapCommand(ctx context.Context, _ lcontext.LContex
h.aggregate = aggregate
h.turboAggregate = turboAggregate
+ maprMessages, closeMaprMessages := h.newGeneratedMaprMessagesChannel(ctx, sessionGenerationFromContext(ctx))
go func() {
- command.Start(ctx, h.maprMessages)
+ defer closeMaprMessages()
+ command.Start(ctx, maprMessages)
commandFinished()
}()
}
@@ -205,3 +208,25 @@ func (h *ServerHandler) handleAuthKeyCommand(_ context.Context, _ lcontext.LCont
h.authKeyStore.Add(h.user.Name, pubKey)
h.sendln(h.serverMessages, "AUTHKEY OK")
}
+
+func (h *ServerHandler) newGeneratedMaprMessagesChannel(ctx context.Context, generation uint64) (chan string, func()) {
+ maprMessages := make(chan string, 16)
+ go func() {
+ for {
+ select {
+ case message, ok := <-maprMessages:
+ if !ok {
+ return
+ }
+ h.send(h.maprMessages, encodeGeneratedMessage(generation, message))
+ case <-ctx.Done():
+ return
+ case <-h.done.Done():
+ return
+ }
+ }
+ }()
+ return maprMessages, func() {
+ close(maprMessages)
+ }
+}
diff --git a/internal/server/handlers/sessioncommand.go b/internal/server/handlers/sessioncommand.go
index 0d54963..25b8d15 100644
--- a/internal/server/handlers/sessioncommand.go
+++ b/internal/server/handlers/sessioncommand.go
@@ -140,6 +140,7 @@ func (s *sessionCommandState) start(handler *ServerHandler, spec session.Spec) (
s.spec = spec
s.cancel = cancel
s.mu.Unlock()
+ ctx = withSessionGeneration(ctx, 1)
handler.resetSessionAggregates()
if err := handler.dispatchSessionCommands(ctx, commands); err != nil {
@@ -172,6 +173,7 @@ func (s *sessionCommandState) update(handler *ServerHandler, spec session.Spec,
s.spec = spec
s.cancel = cancel
s.mu.Unlock()
+ ctx = withSessionGeneration(ctx, generation)
if oldCancel != nil {
oldCancel()
@@ -220,6 +222,12 @@ func (s *sessionCommandState) keepAlive() bool {
return s.active
}
+func (s *sessionCommandState) currentGeneration() uint64 {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.generation
+}
+
func (s *sessionCommandState) reset() {
s.mu.Lock()
defer s.mu.Unlock()
diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go
index deed383..4b4a883 100644
--- a/internal/server/handlers/turbo_manager.go
+++ b/internal/server/handlers/turbo_manager.go
@@ -195,7 +195,7 @@ func (t *turboManager) flush(user *user.User) {
// tryRead tries to serve data from turbo state and channels.
// Returns handled=false when caller should continue with normal path.
-func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) {
+func (t *turboManager) tryRead(p []byte, user *user.User, shouldDropGeneration func(uint64) bool) (n int, handled bool) {
if !t.mode {
return 0, false
}
@@ -215,57 +215,69 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool)
channelLen := len(t.lines)
dlog.Server.Trace(user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen)
- select {
- case turboData := <-t.lines:
- dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData))
- t.eofEmptySince = time.Time{}
- n = copy(p, turboData)
- if n < len(turboData) {
- t.buffer = turboData[n:]
- dlog.Server.Trace(user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(t.buffer))
- }
- return n, true
- default:
- if channelLen > 0 {
- dlog.Server.Trace(user, "baseHandler.Read", "channel has data but not available, waiting")
- time.Sleep(t.resolvedReadRetryInterval())
- select {
- case turboData := <-t.lines:
- dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData))
+ for {
+ select {
+ case turboData := <-t.lines:
+ generation, decodedData := decodeGeneratedBytes(turboData)
+ if shouldDropGeneration != nil && shouldDropGeneration(generation) {
t.eofEmptySince = time.Time{}
- n = copy(p, turboData)
- if n < len(turboData) {
- t.buffer = turboData[n:]
- }
- return n, true
- default:
- // Still no data.
+ continue
}
- }
-
- if t.eof != nil {
- select {
- case <-t.eof:
- if len(t.lines) > 0 {
+ dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(decodedData))
+ t.eofEmptySince = time.Time{}
+ n = copy(p, decodedData)
+ if n < len(decodedData) {
+ t.buffer = decodedData[n:]
+ dlog.Server.Trace(user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(t.buffer))
+ }
+ return n, true
+ default:
+ if channelLen > 0 {
+ dlog.Server.Trace(user, "baseHandler.Read", "channel has data but not available, waiting")
+ time.Sleep(t.resolvedReadRetryInterval())
+ select {
+ case turboData := <-t.lines:
+ generation, decodedData := decodeGeneratedBytes(turboData)
+ if shouldDropGeneration != nil && shouldDropGeneration(generation) {
+ t.eofEmptySince = time.Time{}
+ continue
+ }
+ dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(decodedData))
t.eofEmptySince = time.Time{}
- break
- }
-
- if t.eofEmptySince.IsZero() {
- t.eofEmptySince = time.Now()
- break
+ n = copy(p, decodedData)
+ if n < len(decodedData) {
+ t.buffer = decodedData[n:]
+ }
+ return n, true
+ default:
+ // Still no data.
}
+ }
- if time.Since(t.eofEmptySince) >= t.resolvedEOFAckQuietPeriod() {
- dlog.Server.Trace(user, "baseHandler.Read", "EOF acknowledged and channel stable-empty, disabling turbo mode")
- t.mode = false
- t.signalEOFAck()
+ if t.eof != nil {
+ select {
+ case <-t.eof:
+ if len(t.lines) > 0 {
+ t.eofEmptySince = time.Time{}
+ break
+ }
+
+ if t.eofEmptySince.IsZero() {
+ t.eofEmptySince = time.Now()
+ break
+ }
+
+ if time.Since(t.eofEmptySince) >= t.resolvedEOFAckQuietPeriod() {
+ dlog.Server.Trace(user, "baseHandler.Read", "EOF acknowledged and channel stable-empty, disabling turbo mode")
+ t.mode = false
+ t.signalEOFAck()
+ }
+ default:
}
- default:
}
- }
- dlog.Server.Trace(user, "baseHandler.Read", "no data in turboLines, falling through")
- return 0, false
+ dlog.Server.Trace(user, "baseHandler.Read", "no data in turboLines, falling through")
+ return 0, false
+ }
}
}
diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go
index af124e9..fa12f72 100644
--- a/internal/server/handlers/turbo_writer.go
+++ b/internal/server/handlers/turbo_writer.go
@@ -30,6 +30,7 @@ type DirectTurboWriter struct {
hostname string
plain bool
serverless bool
+ generation uint64
// Buffering for efficiency
writeBuf bytes.Buffer
@@ -39,6 +40,8 @@ type DirectTurboWriter struct {
// Stats
linesWritten uint64
bytesWritten uint64
+
+ activeGeneration func() uint64
}
var _ TurboWriter = (*DirectTurboWriter)(nil)
@@ -54,9 +57,19 @@ func NewDirectTurboWriter(writer io.Writer, hostname string, plain, serverless b
}
}
+func NewGeneratedDirectTurboWriter(writer io.Writer, hostname string, plain, serverless bool, generation uint64, activeGeneration func() uint64) *DirectTurboWriter {
+ w := NewDirectTurboWriter(writer, hostname, plain, serverless)
+ w.generation = generation
+ w.activeGeneration = activeGeneration
+ return w
+}
+
// WriteLineData writes formatted line data directly to output.
// Dispatches to serverless or network mode handlers based on configuration.
func (w *DirectTurboWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error {
+ if !shouldWriteGeneration(w.generation, w.activeGeneration) {
+ return nil
+ }
w.mutex.Lock()
defer w.mutex.Unlock()
@@ -135,6 +148,9 @@ func (w *DirectTurboWriter) writeNetworkLine(lineContent []byte, lineNum uint64,
// WriteServerMessage writes a server message
func (w *DirectTurboWriter) WriteServerMessage(message string) error {
+ if !shouldWriteGeneration(w.generation, w.activeGeneration) {
+ return nil
+ }
if w.serverless {
return nil
}
@@ -209,6 +225,7 @@ type TurboChannelWriter struct {
hostname string
plain bool
serverless bool
+ generation uint64
// Buffering for efficiency
writeBuf bytes.Buffer
@@ -218,6 +235,8 @@ type TurboChannelWriter struct {
// Stats
linesWritten uint64
bytesWritten uint64
+
+ activeGeneration func() uint64
}
var _ TurboWriter = (*TurboChannelWriter)(nil)
@@ -233,8 +252,18 @@ func NewTurboChannelWriter(channel chan<- []byte, hostname string, plain, server
}
}
+func NewGeneratedTurboChannelWriter(channel chan<- []byte, hostname string, plain, serverless bool, generation uint64, activeGeneration func() uint64) *TurboChannelWriter {
+ w := NewTurboChannelWriter(channel, hostname, plain, serverless)
+ w.generation = generation
+ w.activeGeneration = activeGeneration
+ return w
+}
+
// WriteLineData formats and writes line data to the turbo channel
func (w *TurboChannelWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error {
+ if !shouldWriteGeneration(w.generation, w.activeGeneration) {
+ return nil
+ }
w.mutex.Lock()
defer w.mutex.Unlock()
@@ -255,7 +284,7 @@ func (w *TurboChannelWriter) WriteLineData(lineContent []byte, lineNum uint64, s
w.writeBuf.Reset()
select {
- case w.channel <- data:
+ case w.channel <- encodeGeneratedBytes(w.generation, data):
return nil
default:
return fmt.Errorf("turbo channel full")
@@ -264,6 +293,9 @@ func (w *TurboChannelWriter) WriteLineData(lineContent []byte, lineNum uint64, s
// WriteServerMessage writes a server message
func (w *TurboChannelWriter) WriteServerMessage(message string) error {
+ if !shouldWriteGeneration(w.generation, w.activeGeneration) {
+ return nil
+ }
if w.serverless {
return nil
}
@@ -288,7 +320,7 @@ func (w *TurboChannelWriter) WriteServerMessage(message string) error {
data := buf.Bytes()
select {
- case w.channel <- data:
+ case w.channel <- encodeGeneratedBytes(w.generation, data):
return nil
default:
return fmt.Errorf("turbo channel full")
@@ -315,6 +347,7 @@ type TurboNetworkWriter struct {
hostname string
plain bool
serverless bool
+ generation uint64
// Internal buffer for batching writes
writeBuf bytes.Buffer
@@ -324,11 +357,16 @@ type TurboNetworkWriter struct {
// Stats
linesWritten uint64
bytesWritten uint64
+
+ activeGeneration func() uint64
}
// WriteLineData formats and writes line data directly to the turbo channel.
// Builds the protocol-formatted line and sends it via sendToTurboChannel.
func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error {
+ if !shouldWriteGeneration(w.generation, w.activeGeneration) {
+ return nil
+ }
w.mutex.Lock()
defer w.mutex.Unlock()
@@ -366,7 +404,7 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error {
// Send data to turbo channel, retry once if full
select {
- case w.turboLines <- data:
+ case w.turboLines <- encodeGeneratedBytes(w.generation, data):
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel successfully")
w.writeBuf.Reset()
return nil
@@ -374,7 +412,7 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error {
// Channel full, wait a bit and retry
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "channel full, waiting before retry")
time.Sleep(time.Millisecond)
- w.turboLines <- data
+ w.turboLines <- encodeGeneratedBytes(w.generation, data)
dlog.Server.Trace("TurboNetworkWriter.sendToTurboChannel", "sent to channel after retry")
w.writeBuf.Reset()
return nil
@@ -383,11 +421,14 @@ func (w *TurboNetworkWriter) sendToTurboChannel() error {
// WriteServerMessage writes a server message
func (w *TurboNetworkWriter) WriteServerMessage(message string) error {
+ if !shouldWriteGeneration(w.generation, w.activeGeneration) {
+ return nil
+ }
// Server messages are less critical in turbo mode
// We can send them through the normal channel
if w.serverMessages != nil {
select {
- case w.serverMessages <- message:
+ case w.serverMessages <- encodeGeneratedMessage(w.generation, message):
return nil
default:
return fmt.Errorf("server message channel full")
@@ -412,7 +453,7 @@ func (w *TurboNetworkWriter) Flush() error {
copy(data, w.writeBuf.Bytes())
// Force send the data
- w.turboLines <- data
+ w.turboLines <- encodeGeneratedBytes(w.generation, data)
w.writeBuf.Reset()
dlog.Server.Trace("TurboNetworkWriter.Flush", "flushed data to channel")
}
@@ -437,6 +478,19 @@ func (w *TurboNetworkWriter) Flush() error {
return nil
}
+func shouldWriteGeneration(generation uint64, activeGeneration func() uint64) bool {
+ if generation == 0 || activeGeneration == nil {
+ return true
+ }
+
+ currentGeneration := activeGeneration()
+ if currentGeneration == 0 {
+ return true
+ }
+
+ return currentGeneration == generation
+}
+
// DirectLineProcessor processes lines directly without channels in turbo mode
type DirectLineProcessor struct {
writer TurboWriter