diff options
Diffstat (limited to 'internal/server/handlers/generation_output_test.go')
| -rw-r--r-- | internal/server/handlers/generation_output_test.go | 45 |
1 files changed, 45 insertions, 0 deletions
diff --git a/internal/server/handlers/generation_output_test.go b/internal/server/handlers/generation_output_test.go index 6020c09..393afac 100644 --- a/internal/server/handlers/generation_output_test.go +++ b/internal/server/handlers/generation_output_test.go @@ -2,8 +2,10 @@ package handlers import ( "bytes" + "context" "strings" "testing" + "time" "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/line" @@ -48,6 +50,49 @@ func TestBaseHandlerReadDropsStaleMaprMessage(t *testing.T) { } } +func TestGeneratedMaprMessagesChannelCloseWaitsForForwarding(t *testing.T) { + handler := &ServerHandler{ + baseHandler: baseHandler{ + done: internal.NewDone(), + maprMessages: make(chan string), + }, + } + + generated, closeGenerated := handler.newGeneratedMaprMessagesChannel(context.Background(), 7) + generated <- "final aggregate" + + closed := make(chan struct{}) + go func() { + closeGenerated() + close(closed) + }() + + select { + case <-closed: + t.Fatal("closeGenerated returned before mapreduce payload was forwarded") + case <-time.After(20 * time.Millisecond): + } + + select { + case message := <-handler.maprMessages: + generation, payload := decodeGeneratedMessage(message) + if generation != 7 { + t.Fatalf("unexpected generation: %d", generation) + } + if payload != "final aggregate" { + t.Fatalf("unexpected payload: %q", payload) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for forwarded mapreduce payload") + } + + select { + case <-closed: + case <-time.After(time.Second): + t.Fatal("timed out waiting for closeGenerated to finish") + } +} + func TestBaseHandlerReadDropsStaleLine(t *testing.T) { handler := newGenerationTestHandler(4) |
