diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 22:44:34 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 22:44:34 +0200 |
| commit | 91296d85e8a6f1aca5beaeeecf648683c83c75bc (patch) | |
| tree | d6bf0b0be51a72d0a597402e84b3664145d8e041 | |
| parent | 1b34e1f2501b8def0a0fb4eae28bf6c19a8adde2 (diff) | |
Fix mapreduce integration drain race
| -rw-r--r-- | Makefile | 3 | ||||
| -rw-r--r-- | internal/server/handlers/generation_output_test.go | 45 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 3 |
3 files changed, 51 insertions, 0 deletions
@@ -65,6 +65,9 @@ test: ${GO} clean -testcache set -e; find . -name '*_test.go' | while read file; do dirname $$file; done | \ sort -u | while read dir; do ${GO} test -tags '${GO_TAGS}' --race -v -failfast $$dir || exit 2; done +test-integration: clean build + ${GO} clean -testcache + DTAIL_INTEGRATION_TEST_RUN_MODE=yes ${GO} test -tags '${GO_TAGS}' --race -count=1 ./integrationtests benchmark: build dtail-tools ./dtail-tools benchmark -mode run benchmark-quick: build dtail-tools diff --git a/internal/server/handlers/generation_output_test.go b/internal/server/handlers/generation_output_test.go index 6020c09..393afac 100644 --- a/internal/server/handlers/generation_output_test.go +++ b/internal/server/handlers/generation_output_test.go @@ -2,8 +2,10 @@ package handlers import ( "bytes" + "context" "strings" "testing" + "time" "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/line" @@ -48,6 +50,49 @@ func TestBaseHandlerReadDropsStaleMaprMessage(t *testing.T) { } } +func TestGeneratedMaprMessagesChannelCloseWaitsForForwarding(t *testing.T) { + handler := &ServerHandler{ + baseHandler: baseHandler{ + done: internal.NewDone(), + maprMessages: make(chan string), + }, + } + + generated, closeGenerated := handler.newGeneratedMaprMessagesChannel(context.Background(), 7) + generated <- "final aggregate" + + closed := make(chan struct{}) + go func() { + closeGenerated() + close(closed) + }() + + select { + case <-closed: + t.Fatal("closeGenerated returned before mapreduce payload was forwarded") + case <-time.After(20 * time.Millisecond): + } + + select { + case message := <-handler.maprMessages: + generation, payload := decodeGeneratedMessage(message) + if generation != 7 { + t.Fatalf("unexpected generation: %d", generation) + } + if payload != "final aggregate" { + t.Fatalf("unexpected payload: %q", payload) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for forwarded mapreduce payload") + } + + select { + case <-closed: + case <-time.After(time.Second): + t.Fatal("timed out waiting for closeGenerated to finish") + } +} + func TestBaseHandlerReadDropsStaleLine(t *testing.T) { handler := newGenerationTestHandler(4) diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index ef64468..cd930f9 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -211,7 +211,9 @@ func (h *ServerHandler) handleAuthKeyCommand(_ context.Context, _ lcontext.LCont func (h *ServerHandler) newGeneratedMaprMessagesChannel(ctx context.Context, generation uint64) (chan string, func()) { maprMessages := make(chan string, 16) + done := make(chan struct{}) go func() { + defer close(done) for { select { case message, ok := <-maprMessages: @@ -228,5 +230,6 @@ func (h *ServerHandler) newGeneratedMaprMessagesChannel(ctx context.Context, gen }() return maprMessages, func() { close(maprMessages) + <-done } } |
