summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile3
-rw-r--r--internal/server/handlers/generation_output_test.go45
-rw-r--r--internal/server/handlers/serverhandler.go3
3 files changed, 51 insertions, 0 deletions
diff --git a/Makefile b/Makefile
index 659c9d1..26e63aa 100644
--- a/Makefile
+++ b/Makefile
@@ -65,6 +65,9 @@ test:
${GO} clean -testcache
set -e; find . -name '*_test.go' | while read file; do dirname $$file; done | \
sort -u | while read dir; do ${GO} test -tags '${GO_TAGS}' --race -v -failfast $$dir || exit 2; done
+test-integration: clean build
+ ${GO} clean -testcache
+ DTAIL_INTEGRATION_TEST_RUN_MODE=yes ${GO} test -tags '${GO_TAGS}' --race -count=1 ./integrationtests
benchmark: build dtail-tools
./dtail-tools benchmark -mode run
benchmark-quick: build dtail-tools
diff --git a/internal/server/handlers/generation_output_test.go b/internal/server/handlers/generation_output_test.go
index 6020c09..393afac 100644
--- a/internal/server/handlers/generation_output_test.go
+++ b/internal/server/handlers/generation_output_test.go
@@ -2,8 +2,10 @@ package handlers
import (
"bytes"
+ "context"
"strings"
"testing"
+ "time"
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/line"
@@ -48,6 +50,49 @@ func TestBaseHandlerReadDropsStaleMaprMessage(t *testing.T) {
}
}
+func TestGeneratedMaprMessagesChannelCloseWaitsForForwarding(t *testing.T) {
+ handler := &ServerHandler{
+ baseHandler: baseHandler{
+ done: internal.NewDone(),
+ maprMessages: make(chan string),
+ },
+ }
+
+ generated, closeGenerated := handler.newGeneratedMaprMessagesChannel(context.Background(), 7)
+ generated <- "final aggregate"
+
+ closed := make(chan struct{})
+ go func() {
+ closeGenerated()
+ close(closed)
+ }()
+
+ select {
+ case <-closed:
+ t.Fatal("closeGenerated returned before mapreduce payload was forwarded")
+ case <-time.After(20 * time.Millisecond):
+ }
+
+ select {
+ case message := <-handler.maprMessages:
+ generation, payload := decodeGeneratedMessage(message)
+ if generation != 7 {
+ t.Fatalf("unexpected generation: %d", generation)
+ }
+ if payload != "final aggregate" {
+ t.Fatalf("unexpected payload: %q", payload)
+ }
+ case <-time.After(time.Second):
+ t.Fatal("timed out waiting for forwarded mapreduce payload")
+ }
+
+ select {
+ case <-closed:
+ case <-time.After(time.Second):
+ t.Fatal("timed out waiting for closeGenerated to finish")
+ }
+}
+
func TestBaseHandlerReadDropsStaleLine(t *testing.T) {
handler := newGenerationTestHandler(4)
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index ef64468..cd930f9 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -211,7 +211,9 @@ func (h *ServerHandler) handleAuthKeyCommand(_ context.Context, _ lcontext.LCont
func (h *ServerHandler) newGeneratedMaprMessagesChannel(ctx context.Context, generation uint64) (chan string, func()) {
maprMessages := make(chan string, 16)
+ done := make(chan struct{})
go func() {
+ defer close(done)
for {
select {
case message, ok := <-maprMessages:
@@ -228,5 +230,6 @@ func (h *ServerHandler) newGeneratedMaprMessagesChannel(ctx context.Context, gen
}()
return maprMessages, func() {
close(maprMessages)
+ <-done
}
}