diff options
Diffstat (limited to 'internal/server/handlers/readcommand.go')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 25 |
1 files changed, 17 insertions, 8 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 9677718..d4c9c30 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -328,23 +328,24 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext func (r *readCommand) readViaChannels() readStrategy { return func(ctx context.Context, ltx lcontext.LContext, reader fs.FileReader, re regex.Regex) error { var linesCh chan *line.Line - closeLines := false + var closeLines func() if r.server.HasRegularAggregate() { // For MapReduce operations, create a new channel that goes only to the aggregate. linesCh = make(chan *line.Line, r.server.AggregateLinesChannelBufferSize()) r.server.RegisterAggregateLines(linesCh) - closeLines = true + closeLines = func() { + close(linesCh) + } } else { // For non-MapReduce operations, forward lines through a generation-aware channel. - linesCh = r.newGeneratedLinesChannel(ctx) - closeLines = true + linesCh, closeLines = r.newGeneratedLinesChannel(ctx) } err := reader.Start(ctx, ltx, linesCh, re) - if closeLines { + if closeLines != nil { // Closing the aggregate line channel triggers flush. - close(linesCh) + closeLines() } return err @@ -463,7 +464,9 @@ func (r *readCommand) sendServerMessage(message string) { func (r *readCommand) newGeneratedServerMessagesChannel(ctx context.Context) (chan string, func()) { serverMessages := make(chan string, 16) + done := make(chan struct{}) go func() { + defer close(done) for { select { case message, ok := <-serverMessages: @@ -482,12 +485,15 @@ func (r *readCommand) newGeneratedServerMessagesChannel(ctx context.Context) (ch }() return serverMessages, func() { close(serverMessages) + <-done } } -func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) chan *line.Line { +func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) (chan *line.Line, func()) { linesCh := make(chan *line.Line, r.server.AggregateLinesChannelBufferSize()) + done := make(chan struct{}) go func() { + defer close(done) for { select { case generatedLine, ok := <-linesCh: @@ -512,7 +518,10 @@ func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) chan *line.L } } }() - return linesCh + return linesCh, func() { + close(linesCh) + <-done + } } func (r *readCommand) isInputFromPipe() bool { |
