summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go25
1 files changed, 17 insertions, 8 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 9677718..d4c9c30 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -328,23 +328,24 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext
func (r *readCommand) readViaChannels() readStrategy {
return func(ctx context.Context, ltx lcontext.LContext, reader fs.FileReader, re regex.Regex) error {
var linesCh chan *line.Line
- closeLines := false
+ var closeLines func()
if r.server.HasRegularAggregate() {
// For MapReduce operations, create a new channel that goes only to the aggregate.
linesCh = make(chan *line.Line, r.server.AggregateLinesChannelBufferSize())
r.server.RegisterAggregateLines(linesCh)
- closeLines = true
+ closeLines = func() {
+ close(linesCh)
+ }
} else {
// For non-MapReduce operations, forward lines through a generation-aware channel.
- linesCh = r.newGeneratedLinesChannel(ctx)
- closeLines = true
+ linesCh, closeLines = r.newGeneratedLinesChannel(ctx)
}
err := reader.Start(ctx, ltx, linesCh, re)
- if closeLines {
+ if closeLines != nil {
// Closing the aggregate line channel triggers flush.
- close(linesCh)
+ closeLines()
}
return err
@@ -463,7 +464,9 @@ func (r *readCommand) sendServerMessage(message string) {
func (r *readCommand) newGeneratedServerMessagesChannel(ctx context.Context) (chan string, func()) {
serverMessages := make(chan string, 16)
+ done := make(chan struct{})
go func() {
+ defer close(done)
for {
select {
case message, ok := <-serverMessages:
@@ -482,12 +485,15 @@ func (r *readCommand) newGeneratedServerMessagesChannel(ctx context.Context) (ch
}()
return serverMessages, func() {
close(serverMessages)
+ <-done
}
}
-func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) chan *line.Line {
+func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) (chan *line.Line, func()) {
linesCh := make(chan *line.Line, r.server.AggregateLinesChannelBufferSize())
+ done := make(chan struct{})
go func() {
+ defer close(done)
for {
select {
case generatedLine, ok := <-linesCh:
@@ -512,7 +518,10 @@ func (r *readCommand) newGeneratedLinesChannel(ctx context.Context) chan *line.L
}
}
}()
- return linesCh
+ return linesCh, func() {
+ close(linesCh)
+ <-done
+ }
}
func (r *readCommand) isInputFromPipe() bool {