diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-10 19:37:21 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-10 19:37:21 +0200 |
| commit | f6e23930da2900c43a5389a2e7d1e38d8221a76f (patch) | |
| tree | 3352cc0d8c0819d5cc58fdf987ed39f87a30a34b /internal/mapr/server/turbo_aggregate_test.go | |
| parent | 1fc24f9affed5128702e4de80572cac8c82d399e (diff) | |
Refactor server-side config singleton reads
Diffstat (limited to 'internal/mapr/server/turbo_aggregate_test.go')
| -rw-r--r-- | internal/mapr/server/turbo_aggregate_test.go | 108 |
1 files changed, 54 insertions, 54 deletions
diff --git a/internal/mapr/server/turbo_aggregate_test.go b/internal/mapr/server/turbo_aggregate_test.go index ec1d6a3..f556f50 100644 --- a/internal/mapr/server/turbo_aggregate_test.go +++ b/internal/mapr/server/turbo_aggregate_test.go @@ -25,7 +25,7 @@ func TestTurboAggregateVsRegular(t *testing.T) { if config.Server == nil { config.Server = &config.ServerConfig{ MapreduceLogFormat: "default", - TurboBoostDisable: false, + TurboBoostDisable: false, } } if dlog.Server == nil { @@ -35,10 +35,10 @@ func TestTurboAggregateVsRegular(t *testing.T) { wg.Add(1) dlog.Start(ctx, &wg, source.Server) } - + // Test query queryStr := `from STATS select count($time),$time,avg($goroutines) from - group by $time order by $time` - + // Test data - DTail MapReduce format testLines := []string{ "INFO|1002-071143|1|stats.go:56|8|15|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1", @@ -47,23 +47,23 @@ func TestTurboAggregateVsRegular(t *testing.T) { "INFO|1002-071147|1|stats.go:56|8|10|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1", "INFO|1002-071147|1|stats.go:56|8|11|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1", } - + t.Run("TurboAggregate", func(t *testing.T) { // Create turbo aggregate - turboAgg, err := NewTurboAggregate(queryStr) + turboAgg, err := NewTurboAggregate(queryStr, config.Server.MapreduceLogFormat) if err != nil { t.Fatalf("Failed to create turbo aggregate: %v", err) } - + // Channel to collect messages messages := make(chan string, 100) // Use a cancellable context ctx, cancel := context.WithCancel(context.Background()) defer cancel() - + // Start the turbo aggregate turboAgg.Start(ctx, messages) - + // Process lines processor := NewTurboAggregateProcessor(turboAgg, "test") for i, line := range testLines { @@ -73,25 +73,25 @@ func TestTurboAggregateVsRegular(t *testing.T) { t.Errorf("Failed to process line %d: %v", i+1, err) } } - + // Flush to ensure all data is processed err = processor.Flush() if err != nil { t.Errorf("Failed to flush: %v", err) } - + // Close the processor to decrement activeProcessors err = processor.Close() if err != nil { t.Errorf("Failed to close processor: %v", err) } - + // Shutdown and get results turboAgg.Shutdown() - + // Cancel context to stop background goroutines cancel() - + // Collect results with timeout done := make(chan struct{}) var results []string @@ -101,11 +101,11 @@ func TestTurboAggregateVsRegular(t *testing.T) { } close(done) }() - + // Wait a bit for serialization time.Sleep(200 * time.Millisecond) close(messages) - + // Wait for collection to complete with timeout select { case <-done: @@ -113,36 +113,36 @@ func TestTurboAggregateVsRegular(t *testing.T) { case <-time.After(2 * time.Second): t.Error("Timeout collecting messages") } - + t.Logf("Turbo mode processed %d lines", turboAgg.linesProcessed.Load()) t.Logf("Turbo mode results: %d messages", len(results)) for _, r := range results { t.Logf("Result: %s", r) } - + // Verify we got results if len(results) == 0 { t.Error("Turbo mode produced no results") } - + // Check line count if turboAgg.linesProcessed.Load() != uint64(len(testLines)) { t.Errorf("Expected %d lines processed, got %d", len(testLines), turboAgg.linesProcessed.Load()) } }) - + t.Run("RegularAggregate", func(t *testing.T) { // Create regular aggregate - regularAgg, err := NewAggregate(queryStr) + regularAgg, err := NewAggregate(queryStr, config.Server.MapreduceLogFormat) if err != nil { t.Fatalf("Failed to create regular aggregate: %v", err) } - + // Channel to collect messages messages := make(chan string, 100) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - + // Start the regular aggregate in a goroutine var wg sync.WaitGroup wg.Add(1) @@ -150,14 +150,14 @@ func TestTurboAggregateVsRegular(t *testing.T) { defer wg.Done() regularAgg.Start(ctx, messages) }() - + // Give it time to start time.Sleep(50 * time.Millisecond) - + // Create line channel lines := make(chan *line.Line, 100) regularAgg.NextLinesCh <- lines - + // Process lines for _, lineStr := range testLines { l := &line.Line{ @@ -167,30 +167,30 @@ func TestTurboAggregateVsRegular(t *testing.T) { lines <- l } close(lines) - + // Wait for processing time.Sleep(100 * time.Millisecond) - + // Shutdown regularAgg.Shutdown() cancel() - + // Wait for the Start goroutine to finish wg.Wait() - + // Collect results close(messages) - + var results []string for msg := range messages { results = append(results, msg) } - + t.Logf("Regular mode results: %d messages", len(results)) for _, r := range results { t.Logf("Result: %s", r) } - + // Verify we got results if len(results) == 0 { t.Error("Regular mode produced no results") @@ -210,7 +210,7 @@ func TestTurboAggregateConcurrency(t *testing.T) { if config.Server == nil { config.Server = &config.ServerConfig{ MapreduceLogFormat: "default", - TurboBoostDisable: false, + TurboBoostDisable: false, } } if dlog.Server == nil { @@ -220,81 +220,81 @@ func TestTurboAggregateConcurrency(t *testing.T) { wg.Add(1) dlog.Start(ctx, &wg, source.Server) } - + queryStr := `from STATS select count($time),$time from - group by $time` - + // Create turbo aggregate - turboAgg, err := NewTurboAggregate(queryStr) + turboAgg, err := NewTurboAggregate(queryStr, config.Server.MapreduceLogFormat) if err != nil { t.Fatalf("Failed to create turbo aggregate: %v", err) } - + // Channel to collect messages messages := make(chan string, 1000) ctx := context.Background() - + // Start the turbo aggregate turboAgg.Start(ctx, messages) - + // Process multiple "files" concurrently var wg sync.WaitGroup numFiles := 10 linesPerFile := 100 - + for f := 0; f < numFiles; f++ { wg.Add(1) go func(fileNum int) { defer wg.Done() - + processor := NewTurboAggregateProcessor(turboAgg, "file"+string(rune(fileNum))) - + // Process lines for i := 0; i < linesPerFile; i++ { line := "INFO|1002-071143|1|stats.go:56|8|15|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1" buf := bytes.NewBufferString(line) _ = processor.ProcessLine(buf, uint64(i+1), "file"+string(rune(fileNum))) } - + // Flush when file completes _ = processor.Flush() - + // Close the processor to decrement activeProcessors _ = processor.Close() }(f) } - + // Wait for all files to complete wg.Wait() - + // Shutdown and get results turboAgg.Shutdown() - + // Collect results time.Sleep(200 * time.Millisecond) close(messages) - + var results []string for msg := range messages { if strings.Contains(msg, "1002-071143") { results = append(results, msg) } } - + t.Logf("Processed %d lines total", turboAgg.linesProcessed.Load()) t.Logf("Processed %d files", turboAgg.filesProcessed.Load()) t.Logf("Got %d result messages", len(results)) - + // Verify line count expectedLines := uint64(numFiles * linesPerFile) if turboAgg.linesProcessed.Load() != expectedLines { t.Errorf("Expected %d lines processed, got %d", expectedLines, turboAgg.linesProcessed.Load()) } - + // Verify file count (may be higher if test was run multiple times) if turboAgg.filesProcessed.Load() < uint64(numFiles) { t.Errorf("Expected at least %d files processed, got %d", numFiles, turboAgg.filesProcessed.Load()) } - + // Parse result to check count foundExpectedCount := false for _, result := range results { @@ -306,8 +306,8 @@ func TestTurboAggregateConcurrency(t *testing.T) { break } } - + if !foundExpectedCount { t.Error("Did not find expected count of 1000 in results") } -}
\ No newline at end of file +} |
