diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 07:54:56 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 07:54:56 +0200 |
| commit | 16d7d82b9583b0edec8358fd94f6baf25ce8e01d (patch) | |
| tree | 7518b75f6bee9bef099be7a3ed209464020a5075 /internal | |
| parent | 61e9c8a64429a762da263d4a4bd9da1f32ffdd38 (diff) | |
perf: remove tui stream relay channel
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/bench_pipeline_test.go | 15 | ||||
| -rw-r--r-- | internal/ior.go | 17 |
2 files changed, 3 insertions, 29 deletions
diff --git a/internal/bench_pipeline_test.go b/internal/bench_pipeline_test.go index ad4edf1..b520c9c 100644 --- a/internal/bench_pipeline_test.go +++ b/internal/bench_pipeline_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "path/filepath" - "sync" "testing" "ior/internal/benchutil" @@ -202,16 +201,6 @@ func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, n streamBuf := eventstream.NewRingBuffer() streamSeq := eventstream.NewSequencer(0) liveTrie := flamegraph.NewLiveTrie([]string{"comm", "tracepoint", "path"}, "count") - streamEvents := make(chan eventstream.StreamEvent, events) - - var streamWG sync.WaitGroup - streamWG.Add(1) - go func() { - defer streamWG.Done() - for row := range streamEvents { - streamBuf.Push(row) - } - }() recorder := parquet.NewRecorder(parquet.RecorderConfig{}) path := filepath.Join(dir, fmt.Sprintf("tui-%d.parquet", i)) @@ -225,7 +214,7 @@ func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, n el.printCb = func(ep *event.Pair) { row := eventstream.NewStreamEvent(streamSeq.Next(), ep) engine.Ingest(ep) - streamEvents <- row + streamBuf.Push(row) if recordErr == nil { recordErr = recorder.Record(row, 0) } @@ -237,8 +226,6 @@ func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, n el.run(context.Background(), rawCh) b.StopTimer() - close(streamEvents) - streamWG.Wait() if recordErr != nil { b.Fatalf("recorder.Record() error = %v", recordErr) } diff --git a/internal/ior.go b/internal/ior.go index 1010445..d88f0e3 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -244,14 +244,6 @@ func tuiTraceStarterFromRunTrace( bindings.SetEventStreamSource(streamSource) bindings.SetLiveTrie(liveTrie) } - streamEvents := make(chan eventstream.StreamEvent, appconfig.DefaultChannelBufferSize) - - go func() { - for ev := range streamEvents { - streamBuf.Push(ev) - } - }() - startedCh := make(chan struct{}) errCh := make(chan error, 1) @@ -264,7 +256,7 @@ func tuiTraceStarterFromRunTrace( } row := eventstream.NewStreamEvent(streamSeq.Next(), ep) engine.Ingest(ep) - streamEvents <- row + streamBuf.Push(row) if recorder != nil { if err := recorder.Record(row, filterEpoch); err != nil { recorderWarningOnce.Do(func() { @@ -280,14 +272,9 @@ func tuiTraceStarterFromRunTrace( ep.Recycle() } el.warningCb = func(message string) { - // Drop warning notifications if the stream channel is saturated. - select { - case streamEvents <- eventstream.NewWarningEvent(streamSeq.Next(), message): - default: - } + streamBuf.Push(eventstream.NewWarningEvent(streamSeq.Next(), message)) } }) - close(streamEvents) errCh <- err close(errCh) }() |
