diff options
| author | Paul Buetow <paul@buetow.org> | 2025-07-04 12:01:34 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-07-04 12:01:34 +0300 |
| commit | cd8466c2397361a2e4d6b236ac2dd9f9b76ffa49 (patch) | |
| tree | 8ca842cfabf8123e7fd3abefdfe2ba0d1db02959 | |
| parent | 4e7abc300e4c4607511a781ac5f67b44f00a7644 (diff) | |
fix: remove unnecessary delays in turbo mode for serverless operation
In serverless mode (when dcat runs locally), data is written directly to stdout
and doesn't need network transmission delays. This fix eliminates the 500ms+
exit delay by skipping unnecessary sleep calls when running in serverless mode.
Changes:
- Skip 500ms wait in readFiles() when serverless
- Skip 50ms wait in readWithTurboProcessor() when serverless
- Skip aggregate serialization waits when serverless
- Fix turbo benchmark test compilation errors
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
| -rw-r--r-- | internal/clients/turbo_benchmark_test.go | 231 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 30 |
2 files changed, 251 insertions, 10 deletions
diff --git a/internal/clients/turbo_benchmark_test.go b/internal/clients/turbo_benchmark_test.go new file mode 100644 index 0000000..0021393 --- /dev/null +++ b/internal/clients/turbo_benchmark_test.go @@ -0,0 +1,231 @@ +package clients + +import ( + "bytes" + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/source" + "sync" +) + +func setupBenchmarkData(b *testing.B, lines int) string { + b.Helper() + + tmpDir := b.TempDir() + testFile := filepath.Join(tmpDir, "benchmark_data.log") + + f, err := os.Create(testFile) + if err != nil { + b.Fatalf("Failed to create test file: %v", err) + } + defer f.Close() + + // Create test data + for i := 0; i < lines; i++ { + line := fmt.Sprintf("INFO|1002-071143|1|test.go:%d|8|%d|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=%d|lifetimeConnections=%d|pattern=test-%d|data=%s\n", + i%100, i%50, i%10, i, i%5, "some-test-data-that-makes-the-line-longer") + f.WriteString(line) + } + + return testFile +} + +func BenchmarkDGrepTurboEnabled(b *testing.B) { + benchmarkDGrep(b, false) +} + +func BenchmarkDGrepTurboDisabled(b *testing.B) { + benchmarkDGrep(b, true) +} + +func benchmarkDGrep(b *testing.B, disableTurbo bool) { + // Setup config + config.Server = &config.ServerConfig{ + TurboBoostDisable: disableTurbo, + MaxConcurrentCats: 10, + MaxConcurrentTails: 50, + MaxLineLength: 1024 * 1024, + } + + config.Common = &config.CommonConfig{ + Logger: "none", + LogLevel: "error", + } + + config.Client = &config.ClientConfig{ + TermColorsEnable: false, + } + + // Initialize logging + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wg := &sync.WaitGroup{} + wg.Add(1) + dlog.Start(ctx, wg, source.Client) + + // Create test data + testFile := setupBenchmarkData(b, 100000) // 100k lines + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Create grep client + args := config.Args{ + ServersStr: "serverless", + QueryStr: "", + What: testFile, + RegexStr: "pattern=test-1", + Serverless: true, + Plain: true, + } + + client, err := NewGrepClient(args) + if err != nil { + b.Fatalf("Failed to create grep client: %v", err) + } + + // Capture output + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + // Run grep + statusCh := make(chan int, 1) + go func() { + status := client.Start(ctx, nil) // nil for statsCh + statusCh <- status + }() + + // Wait for completion or timeout + select { + case status := <-statusCh: + if status != 0 { + b.Errorf("Grep failed with status: %d", status) + } + case <-time.After(30 * time.Second): + b.Error("Grep timed out") + } + + // Restore stdout + w.Close() + os.Stdout = oldStdout + + // Read captured output + var buf bytes.Buffer + buf.ReadFrom(r) + } +} + +// Benchmark with different file sizes +func BenchmarkDGrepTurboSmallFile(b *testing.B) { + benchmarkDGrepWithSize(b, false, 1000) // 1k lines +} + +func BenchmarkDGrepTurboDisabledSmallFile(b *testing.B) { + benchmarkDGrepWithSize(b, true, 1000) // 1k lines +} + +func BenchmarkDGrepTurboMediumFile(b *testing.B) { + benchmarkDGrepWithSize(b, false, 50000) // 50k lines +} + +func BenchmarkDGrepTurboDisabledMediumFile(b *testing.B) { + benchmarkDGrepWithSize(b, true, 50000) // 50k lines +} + +func BenchmarkDGrepTurboLargeFile(b *testing.B) { + benchmarkDGrepWithSize(b, false, 500000) // 500k lines +} + +func BenchmarkDGrepTurboDisabledLargeFile(b *testing.B) { + benchmarkDGrepWithSize(b, true, 500000) // 500k lines +} + +func benchmarkDGrepWithSize(b *testing.B, disableTurbo bool, lines int) { + // Setup config + config.Server = &config.ServerConfig{ + TurboBoostDisable: disableTurbo, + MaxConcurrentCats: 10, + MaxConcurrentTails: 50, + MaxLineLength: 1024 * 1024, + } + + config.Common = &config.CommonConfig{ + Logger: "none", + LogLevel: "error", + } + + config.Client = &config.ClientConfig{ + TermColorsEnable: false, + } + + // Initialize logging + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wg := &sync.WaitGroup{} + wg.Add(1) + dlog.Start(ctx, wg, source.Client) + + // Create test data + testFile := setupBenchmarkData(b, lines) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Create grep client + args := config.Args{ + ServersStr: "serverless", + QueryStr: "", + What: testFile, + RegexStr: "pattern=test-1", + Serverless: true, + Plain: true, + } + + client, err := NewGrepClient(args) + if err != nil { + b.Fatalf("Failed to create grep client: %v", err) + } + + // Capture output + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + // Run grep + statusCh := make(chan int, 1) + go func() { + status := client.Start(ctx, nil) // nil for statsCh + statusCh <- status + }() + + // Wait for completion or timeout + select { + case status := <-statusCh: + if status != 0 { + b.Errorf("Grep failed with status: %d", status) + } + case <-time.After(30 * time.Second): + b.Error("Grep timed out") + } + + // Restore stdout + w.Close() + os.Stdout = oldStdout + + // Read captured output + var buf bytes.Buffer + buf.ReadFrom(r) + } + + // Report custom metrics + b.ReportMetric(float64(lines), "lines/op") + b.ReportMetric(float64(lines)/b.Elapsed().Seconds(), "lines/sec") +}
\ No newline at end of file diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 7037e5f..3294bdd 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -141,16 +141,19 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, // Wait to ensure all data is transmitted // This is especially important when files are queued due to concurrency limits - waitTime := 500 * time.Millisecond - if len(paths) > 10 { - // For many files, wait proportionally longer - waitTime = time.Duration(len(paths)*10) * time.Millisecond - if waitTime > 2*time.Second { - waitTime = 2 * time.Second + // In serverless mode, data is written directly to stdout, so no wait is needed + if !r.server.serverless { + waitTime := 500 * time.Millisecond + if len(paths) > 10 { + // For many files, wait proportionally longer + waitTime = time.Duration(len(paths)*10) * time.Millisecond + if waitTime > 2*time.Second { + waitTime = 2 * time.Second + } } + dlog.Server.Debug(r.server.user, "Waiting for data transmission", "duration", waitTime) + time.Sleep(waitTime) } - dlog.Server.Debug(r.server.user, "Waiting for data transmission", "duration", waitTime) - time.Sleep(waitTime) } } @@ -177,12 +180,18 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC r.server.turboAggregate.Serialize(context.Background()) // Give more time for serialization to complete // This is critical when processing many files concurrently - time.Sleep(500 * time.Millisecond) + // In serverless mode, serialization is synchronous, so no wait needed + if !r.server.serverless { + time.Sleep(500 * time.Millisecond) + } } // Double-check that we really have no pending work // In turbo mode, there might be a race condition - time.Sleep(10 * time.Millisecond) + // In serverless mode, no need for this delay + if !r.server.serverless { + time.Sleep(10 * time.Millisecond) + } finalPending := atomic.LoadInt32(&r.server.pendingFiles) finalActive := atomic.LoadInt32(&r.server.activeCommands) if finalPending == 0 && finalActive == 0 { @@ -451,6 +460,7 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L // Give time for data to be transmitted // This is crucial for integration tests to ensure all data is sent + // Skip this delay in serverless mode since data is written directly to stdout if !r.server.serverless { dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> waiting for data transmission") time.Sleep(50 * time.Millisecond) |
