summaryrefslogtreecommitdiff
path: root/internal/mapr/server/turbo_aggregate_test.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-10 19:37:21 +0200
committerPaul Buetow <paul@buetow.org>2026-03-10 19:37:21 +0200
commitf6e23930da2900c43a5389a2e7d1e38d8221a76f (patch)
tree3352cc0d8c0819d5cc58fdf987ed39f87a30a34b /internal/mapr/server/turbo_aggregate_test.go
parent1fc24f9affed5128702e4de80572cac8c82d399e (diff)
Refactor server-side config singleton reads
Diffstat (limited to 'internal/mapr/server/turbo_aggregate_test.go')
-rw-r--r--internal/mapr/server/turbo_aggregate_test.go108
1 files changed, 54 insertions, 54 deletions
diff --git a/internal/mapr/server/turbo_aggregate_test.go b/internal/mapr/server/turbo_aggregate_test.go
index ec1d6a3..f556f50 100644
--- a/internal/mapr/server/turbo_aggregate_test.go
+++ b/internal/mapr/server/turbo_aggregate_test.go
@@ -25,7 +25,7 @@ func TestTurboAggregateVsRegular(t *testing.T) {
if config.Server == nil {
config.Server = &config.ServerConfig{
MapreduceLogFormat: "default",
- TurboBoostDisable: false,
+ TurboBoostDisable: false,
}
}
if dlog.Server == nil {
@@ -35,10 +35,10 @@ func TestTurboAggregateVsRegular(t *testing.T) {
wg.Add(1)
dlog.Start(ctx, &wg, source.Server)
}
-
+
// Test query
queryStr := `from STATS select count($time),$time,avg($goroutines) from - group by $time order by $time`
-
+
// Test data - DTail MapReduce format
testLines := []string{
"INFO|1002-071143|1|stats.go:56|8|15|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1",
@@ -47,23 +47,23 @@ func TestTurboAggregateVsRegular(t *testing.T) {
"INFO|1002-071147|1|stats.go:56|8|10|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1",
"INFO|1002-071147|1|stats.go:56|8|11|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1",
}
-
+
t.Run("TurboAggregate", func(t *testing.T) {
// Create turbo aggregate
- turboAgg, err := NewTurboAggregate(queryStr)
+ turboAgg, err := NewTurboAggregate(queryStr, config.Server.MapreduceLogFormat)
if err != nil {
t.Fatalf("Failed to create turbo aggregate: %v", err)
}
-
+
// Channel to collect messages
messages := make(chan string, 100)
// Use a cancellable context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
-
+
// Start the turbo aggregate
turboAgg.Start(ctx, messages)
-
+
// Process lines
processor := NewTurboAggregateProcessor(turboAgg, "test")
for i, line := range testLines {
@@ -73,25 +73,25 @@ func TestTurboAggregateVsRegular(t *testing.T) {
t.Errorf("Failed to process line %d: %v", i+1, err)
}
}
-
+
// Flush to ensure all data is processed
err = processor.Flush()
if err != nil {
t.Errorf("Failed to flush: %v", err)
}
-
+
// Close the processor to decrement activeProcessors
err = processor.Close()
if err != nil {
t.Errorf("Failed to close processor: %v", err)
}
-
+
// Shutdown and get results
turboAgg.Shutdown()
-
+
// Cancel context to stop background goroutines
cancel()
-
+
// Collect results with timeout
done := make(chan struct{})
var results []string
@@ -101,11 +101,11 @@ func TestTurboAggregateVsRegular(t *testing.T) {
}
close(done)
}()
-
+
// Wait a bit for serialization
time.Sleep(200 * time.Millisecond)
close(messages)
-
+
// Wait for collection to complete with timeout
select {
case <-done:
@@ -113,36 +113,36 @@ func TestTurboAggregateVsRegular(t *testing.T) {
case <-time.After(2 * time.Second):
t.Error("Timeout collecting messages")
}
-
+
t.Logf("Turbo mode processed %d lines", turboAgg.linesProcessed.Load())
t.Logf("Turbo mode results: %d messages", len(results))
for _, r := range results {
t.Logf("Result: %s", r)
}
-
+
// Verify we got results
if len(results) == 0 {
t.Error("Turbo mode produced no results")
}
-
+
// Check line count
if turboAgg.linesProcessed.Load() != uint64(len(testLines)) {
t.Errorf("Expected %d lines processed, got %d", len(testLines), turboAgg.linesProcessed.Load())
}
})
-
+
t.Run("RegularAggregate", func(t *testing.T) {
// Create regular aggregate
- regularAgg, err := NewAggregate(queryStr)
+ regularAgg, err := NewAggregate(queryStr, config.Server.MapreduceLogFormat)
if err != nil {
t.Fatalf("Failed to create regular aggregate: %v", err)
}
-
+
// Channel to collect messages
messages := make(chan string, 100)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
-
+
// Start the regular aggregate in a goroutine
var wg sync.WaitGroup
wg.Add(1)
@@ -150,14 +150,14 @@ func TestTurboAggregateVsRegular(t *testing.T) {
defer wg.Done()
regularAgg.Start(ctx, messages)
}()
-
+
// Give it time to start
time.Sleep(50 * time.Millisecond)
-
+
// Create line channel
lines := make(chan *line.Line, 100)
regularAgg.NextLinesCh <- lines
-
+
// Process lines
for _, lineStr := range testLines {
l := &line.Line{
@@ -167,30 +167,30 @@ func TestTurboAggregateVsRegular(t *testing.T) {
lines <- l
}
close(lines)
-
+
// Wait for processing
time.Sleep(100 * time.Millisecond)
-
+
// Shutdown
regularAgg.Shutdown()
cancel()
-
+
// Wait for the Start goroutine to finish
wg.Wait()
-
+
// Collect results
close(messages)
-
+
var results []string
for msg := range messages {
results = append(results, msg)
}
-
+
t.Logf("Regular mode results: %d messages", len(results))
for _, r := range results {
t.Logf("Result: %s", r)
}
-
+
// Verify we got results
if len(results) == 0 {
t.Error("Regular mode produced no results")
@@ -210,7 +210,7 @@ func TestTurboAggregateConcurrency(t *testing.T) {
if config.Server == nil {
config.Server = &config.ServerConfig{
MapreduceLogFormat: "default",
- TurboBoostDisable: false,
+ TurboBoostDisable: false,
}
}
if dlog.Server == nil {
@@ -220,81 +220,81 @@ func TestTurboAggregateConcurrency(t *testing.T) {
wg.Add(1)
dlog.Start(ctx, &wg, source.Server)
}
-
+
queryStr := `from STATS select count($time),$time from - group by $time`
-
+
// Create turbo aggregate
- turboAgg, err := NewTurboAggregate(queryStr)
+ turboAgg, err := NewTurboAggregate(queryStr, config.Server.MapreduceLogFormat)
if err != nil {
t.Fatalf("Failed to create turbo aggregate: %v", err)
}
-
+
// Channel to collect messages
messages := make(chan string, 1000)
ctx := context.Background()
-
+
// Start the turbo aggregate
turboAgg.Start(ctx, messages)
-
+
// Process multiple "files" concurrently
var wg sync.WaitGroup
numFiles := 10
linesPerFile := 100
-
+
for f := 0; f < numFiles; f++ {
wg.Add(1)
go func(fileNum int) {
defer wg.Done()
-
+
processor := NewTurboAggregateProcessor(turboAgg, "file"+string(rune(fileNum)))
-
+
// Process lines
for i := 0; i < linesPerFile; i++ {
line := "INFO|1002-071143|1|stats.go:56|8|15|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=0|lifetimeConnections=1"
buf := bytes.NewBufferString(line)
_ = processor.ProcessLine(buf, uint64(i+1), "file"+string(rune(fileNum)))
}
-
+
// Flush when file completes
_ = processor.Flush()
-
+
// Close the processor to decrement activeProcessors
_ = processor.Close()
}(f)
}
-
+
// Wait for all files to complete
wg.Wait()
-
+
// Shutdown and get results
turboAgg.Shutdown()
-
+
// Collect results
time.Sleep(200 * time.Millisecond)
close(messages)
-
+
var results []string
for msg := range messages {
if strings.Contains(msg, "1002-071143") {
results = append(results, msg)
}
}
-
+
t.Logf("Processed %d lines total", turboAgg.linesProcessed.Load())
t.Logf("Processed %d files", turboAgg.filesProcessed.Load())
t.Logf("Got %d result messages", len(results))
-
+
// Verify line count
expectedLines := uint64(numFiles * linesPerFile)
if turboAgg.linesProcessed.Load() != expectedLines {
t.Errorf("Expected %d lines processed, got %d", expectedLines, turboAgg.linesProcessed.Load())
}
-
+
// Verify file count (may be higher if test was run multiple times)
if turboAgg.filesProcessed.Load() < uint64(numFiles) {
t.Errorf("Expected at least %d files processed, got %d", numFiles, turboAgg.filesProcessed.Load())
}
-
+
// Parse result to check count
foundExpectedCount := false
for _, result := range results {
@@ -306,8 +306,8 @@ func TestTurboAggregateConcurrency(t *testing.T) {
break
}
}
-
+
if !foundExpectedCount {
t.Error("Did not find expected count of 1000 in results")
}
-} \ No newline at end of file
+}