summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go100
1 files changed, 83 insertions, 17 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 493f4b7..9c85889 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -12,6 +12,7 @@ import (
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
@@ -21,6 +22,7 @@ import (
type readCommand struct {
server readCommandServer
mode omode.Mode
+ generation uint64
shutdownCoordinator *shutdownCoordinator
}
@@ -42,19 +44,20 @@ func newReadCommand(server readCommandServer, mode omode.Mode) *readCommand {
func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
argc int, args []string, retries int) {
+ r.generation = sessionGenerationFromContext(ctx)
re := regex.NewNoop()
if argc >= 4 {
deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " "))
if err != nil {
- r.server.SendServerMessage(dlog.Server.Error(r.server.LogContext(),
+ r.sendServerMessage(dlog.Server.Error(r.server.LogContext(),
"Unable to parse command", err))
return
}
re = deserializedRegex
}
if argc < 3 {
- r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(),
+ r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(),
"Unable to parse command", args, argc))
return
}
@@ -91,7 +94,7 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
if numPaths := len(paths); numPaths == 0 {
dlog.Server.Error(r.server.LogContext(), "No such file(s) to read", glob)
- r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(),
+ r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(),
"Unable to read file(s), check server logs"))
select {
case <-ctx.Done():
@@ -106,7 +109,7 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
return
}
- r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(),
+ r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(),
"Giving up to read file(s)"))
return
}
@@ -186,7 +189,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
globID := r.makeGlobID(path, glob)
if !r.server.CanReadFile(path) {
dlog.Server.Error(r.server.LogContext(), "No permission to read file", path, globID)
- r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(),
+ r.sendServerMessage(dlog.Server.Warn(r.server.LogContext(),
"Unable to read file(s), check server logs"))
return
}
@@ -201,16 +204,18 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
var reader fs.FileReader
var limiter chan struct{}
+ serverMessages, closeServerMessages := r.newGeneratedServerMessagesChannel(ctx)
+ defer closeServerMessages()
switch r.mode {
case omode.GrepClient, omode.CatClient:
- catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength())
+ catFile := fs.NewCatFile(path, globID, serverMessages, r.server.MaxLineLength())
reader = &catFile
limiter = r.server.CatLimiter()
case omode.TailClient:
fallthrough
default:
- tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel(), r.server.MaxLineLength())
+ tailFile := fs.NewTailFile(path, globID, serverMessages, r.server.MaxLineLength())
reader = &tailFile
limiter = r.server.TailLimiter()
}
@@ -316,8 +321,9 @@ func (r *readCommand) readViaChannels() readStrategy {
r.server.RegisterAggregateLines(linesCh)
closeLines = true
} else {
- // For non-MapReduce operations, use the server's shared lines channel.
- linesCh = r.server.SharedLinesChannel()
+ // For non-MapReduce operations, forward lines through a generation-aware channel.
+ linesCh = r.newGeneratedLinesChannel(ctx)
+ closeLines = true
}
err := reader.Start(ctx, ltx, linesCh, re)
@@ -373,21 +379,23 @@ func (r *readCommand) ensureTurboModeEnabled() {
}
r.server.EnableTurboMode()
// Wake a potentially blocked reader goroutine so it can switch to turbo drain path.
- r.server.SendServerMessage(".turbo wake")
+ r.sendServerMessage(".turbo wake")
}
func (r *readCommand) makeTurboWriter() TurboWriter {
// Create a writer instance per file to keep concurrent processing isolated.
if r.server.Serverless() {
- return NewDirectTurboWriter(os.Stdout, r.server.Hostname(), r.server.PlainOutput(), r.server.Serverless())
+ return NewGeneratedDirectTurboWriter(os.Stdout, r.server.Hostname(), r.server.PlainOutput(), r.server.Serverless(), r.generation, r.server.ActiveSessionGeneration)
}
return &TurboNetworkWriter{
- turboLines: r.server.GetTurboChannel(),
- serverMessages: r.server.ServerMessagesChannel(),
- hostname: r.server.Hostname(),
- plain: r.server.PlainOutput(),
- serverless: r.server.Serverless(),
+ turboLines: r.server.GetTurboChannel(),
+ serverMessages: r.server.ServerMessagesChannel(),
+ hostname: r.server.Hostname(),
+ plain: r.server.PlainOutput(),
+ serverless: r.server.Serverless(),
+ generation: r.generation,
+ activeGeneration: r.server.ActiveSessionGeneration,
}
}
@@ -428,10 +436,68 @@ func (r *readCommand) makeGlobID(path, glob string) string {
return pathParts[len(pathParts)-1]
}
- r.server.SendServerMessage(dlog.Server.Warn("Empty file path given?", path, glob))
+ r.sendServerMessage(dlog.Server.Warn("Empty file path given?", path, glob))
return ""
}
+func (r *readCommand) sendServerMessage(message string) {
+ r.server.ServerMessagesChannel() <- encodeGeneratedMessage(r.generation, message+"\n")
+}
+
+func (r *readCommand) newGeneratedServerMessagesChannel(ctx context.Context) (chan string, func()) {
+ serverMessages := make(chan string, 16)
+ go func() {
+ for {
+ select {
+ case message, ok := <-serverMessages:
+ if !ok {
+ return
+ }
+ select {
+ case r.server.ServerMessagesChannel() <- encodeGeneratedMessage(r.generation, message):
+ case <-ctx.Done():
+ return
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+ return serverMessages, func() {
+ close(serverMessages)
+ }
+}
+
+func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) chan *line.Line {
+ linesCh := make(chan *line.Line, r.server.AggregateLinesChannelBufferSize())
+ go func() {
+ for {
+ select {
+ case generatedLine, ok := <-linesCh:
+ if !ok {
+ return
+ }
+ if generatedLine == nil {
+ continue
+ }
+ generatedLine.Generation = r.generation
+ select {
+ case r.server.SharedLinesChannel() <- generatedLine:
+ case <-ctx.Done():
+ if generatedLine.Content != nil {
+ pool.RecycleBytesBuffer(generatedLine.Content)
+ }
+ generatedLine.Recycle()
+ return
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+ return linesCh
+}
+
func (r *readCommand) isInputFromPipe() bool {
if !r.server.Serverless() {
// Can read from pipe only in serverless mode.