summaryrefslogtreecommitdiff
path: root/internal/server/handlers/generation_output_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/server/handlers/generation_output_test.go')
-rw-r--r--internal/server/handlers/generation_output_test.go45
1 files changed, 45 insertions, 0 deletions
diff --git a/internal/server/handlers/generation_output_test.go b/internal/server/handlers/generation_output_test.go
index 6020c09..393afac 100644
--- a/internal/server/handlers/generation_output_test.go
+++ b/internal/server/handlers/generation_output_test.go
@@ -2,8 +2,10 @@ package handlers
import (
"bytes"
+ "context"
"strings"
"testing"
+ "time"
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/line"
@@ -48,6 +50,49 @@ func TestBaseHandlerReadDropsStaleMaprMessage(t *testing.T) {
}
}
+func TestGeneratedMaprMessagesChannelCloseWaitsForForwarding(t *testing.T) {
+ handler := &ServerHandler{
+ baseHandler: baseHandler{
+ done: internal.NewDone(),
+ maprMessages: make(chan string),
+ },
+ }
+
+ generated, closeGenerated := handler.newGeneratedMaprMessagesChannel(context.Background(), 7)
+ generated <- "final aggregate"
+
+ closed := make(chan struct{})
+ go func() {
+ closeGenerated()
+ close(closed)
+ }()
+
+ select {
+ case <-closed:
+ t.Fatal("closeGenerated returned before mapreduce payload was forwarded")
+ case <-time.After(20 * time.Millisecond):
+ }
+
+ select {
+ case message := <-handler.maprMessages:
+ generation, payload := decodeGeneratedMessage(message)
+ if generation != 7 {
+ t.Fatalf("unexpected generation: %d", generation)
+ }
+ if payload != "final aggregate" {
+ t.Fatalf("unexpected payload: %q", payload)
+ }
+ case <-time.After(time.Second):
+ t.Fatal("timed out waiting for forwarded mapreduce payload")
+ }
+
+ select {
+ case <-closed:
+ case <-time.After(time.Second):
+ t.Fatal("timed out waiting for closeGenerated to finish")
+ }
+}
+
func TestBaseHandlerReadDropsStaleLine(t *testing.T) {
handler := newGenerationTestHandler(4)