diff options
Diffstat (limited to 'internal/server/handlers/readcommand.go')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 100 |
1 files changed, 83 insertions, 17 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 493f4b7..9c85889 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -12,6 +12,7 @@ import ( "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" @@ -21,6 +22,7 @@ import ( type readCommand struct { server readCommandServer mode omode.Mode + generation uint64 shutdownCoordinator *shutdownCoordinator } @@ -42,19 +44,20 @@ func newReadCommand(server readCommandServer, mode omode.Mode) *readCommand { func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, argc int, args []string, retries int) { + r.generation = sessionGenerationFromContext(ctx) re := regex.NewNoop() if argc >= 4 { deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " ")) if err != nil { - r.server.SendServerMessage(dlog.Server.Error(r.server.LogContext(), + r.sendServerMessage(dlog.Server.Error(r.server.LogContext(), "Unable to parse command", err)) return } re = deserializedRegex } if argc < 3 { - r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(), + r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(), "Unable to parse command", args, argc)) return } @@ -91,7 +94,7 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, if numPaths := len(paths); numPaths == 0 { dlog.Server.Error(r.server.LogContext(), "No such file(s) to read", glob) - r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(), + r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(), "Unable to read file(s), check server logs")) select { case <-ctx.Done(): @@ -106,7 +109,7 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, return } - r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(), + r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(), "Giving up to read file(s)")) return } @@ -186,7 +189,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC globID := r.makeGlobID(path, glob) if !r.server.CanReadFile(path) { dlog.Server.Error(r.server.LogContext(), "No permission to read file", path, globID) - r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(), + r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(), "Unable to read file(s), check server logs")) return } @@ -201,16 +204,18 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, var reader fs.FileReader var limiter chan struct{} + serverMessages, closeServerMessages := r.newGeneratedServerMessagesChannel(ctx) + defer closeServerMessages() switch r.mode { case omode.GrepClient, omode.CatClient: - catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength()) + catFile := fs.NewCatFile(path, globID, serverMessages, r.server.MaxLineLength()) reader = &catFile limiter = r.server.CatLimiter() case omode.TailClient: fallthrough default: - tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength()) + tailFile := fs.NewTailFile(path, globID, serverMessages, r.server.MaxLineLength()) reader = &tailFile limiter = r.server.TailLimiter() } @@ -316,8 +321,9 @@ func (r *readCommand) readViaChannels() readStrategy { r.server.RegisterAggregateLines(linesCh) closeLines = true } else { - // For non-MapReduce operations, use the server's shared lines channel. - linesCh = r.server.SharedLinesChannel() + // For non-MapReduce operations, forward lines through a generation-aware channel. + linesCh = r.newGeneratedLinesChannel(ctx) + closeLines = true } err := reader.Start(ctx, ltx, linesCh, re) @@ -373,21 +379,23 @@ func (r *readCommand) ensureTurboModeEnabled() { } r.server.EnableTurboMode() // Wake a potentially blocked reader goroutine so it can switch to turbo drain path. - r.server.SendServerMessage(".turbo wake") + r.sendServerMessage(".turbo wake") } func (r *readCommand) makeTurboWriter() TurboWriter { // Create a writer instance per file to keep concurrent processing isolated. if r.server.Serverless() { - return NewDirectTurboWriter(os.Stdout, r.server.Hostname(), r.server.PlainOutput(), r.server.Serverless()) + return NewGeneratedDirectTurboWriter(os.Stdout, r.server.Hostname(), r.server.PlainOutput(), r.server.Serverless(), r.generation, r.server.ActiveSessionGeneration) } return &TurboNetworkWriter{ - turboLines: r.server.GetTurboChannel(), - serverMessages: r.server.ServerMessagesChannel(), - hostname: r.server.Hostname(), - plain: r.server.PlainOutput(), - serverless: r.server.Serverless(), + turboLines: r.server.GetTurboChannel(), + serverMessages: r.server.ServerMessagesChannel(), + hostname: r.server.Hostname(), + plain: r.server.PlainOutput(), + serverless: r.server.Serverless(), + generation: r.generation, + activeGeneration: r.server.ActiveSessionGeneration, } } @@ -428,10 +436,68 @@ func (r *readCommand) makeGlobID(path, glob string) string { return pathParts[len(pathParts)-1] } - r.server.SendServerMessage(dlog.Server.Warn("Empty file path given?", path, glob)) + r.sendServerMessage(dlog.Server.Warn("Empty file path given?", path, glob)) return "" } +func (r *readCommand) sendServerMessage(message string) { + r.server.ServerMessagesChannel() <- encodeGeneratedMessage(r.generation, message+"\n") +} + +func (r *readCommand) newGeneratedServerMessagesChannel(ctx context.Context) (chan string, func()) { + serverMessages := make(chan string, 16) + go func() { + for { + select { + case message, ok := <-serverMessages: + if !ok { + return + } + select { + case r.server.ServerMessagesChannel() <- encodeGeneratedMessage(r.generation, message): + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + return serverMessages, func() { + close(serverMessages) + } +} + +func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) chan *line.Line { + linesCh := make(chan *line.Line, r.server.AggregateLinesChannelBufferSize()) + go func() { + for { + select { + case generatedLine, ok := <-linesCh: + if !ok { + return + } + if generatedLine == nil { + continue + } + generatedLine.Generation = r.generation + select { + case r.server.SharedLinesChannel() <- generatedLine: + case <-ctx.Done(): + if generatedLine.Content != nil { + pool.RecycleBytesBuffer(generatedLine.Content) + } + generatedLine.Recycle() + return + } + case <-ctx.Done(): + return + } + } + }() + return linesCh +} + func (r *readCommand) isInputFromPipe() bool { if !r.server.Serverless() { // Can read from pipe only in serverless mode. |
