summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 07:54:56 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 07:54:56 +0200
commit16d7d82b9583b0edec8358fd94f6baf25ce8e01d (patch)
tree7518b75f6bee9bef099be7a3ed209464020a5075 /internal
parent61e9c8a64429a762da263d4a4bd9da1f32ffdd38 (diff)
perf: remove tui stream relay channel
Diffstat (limited to 'internal')
-rw-r--r--internal/bench_pipeline_test.go15
-rw-r--r--internal/ior.go17
2 files changed, 3 insertions, 29 deletions
diff --git a/internal/bench_pipeline_test.go b/internal/bench_pipeline_test.go
index ad4edf1..b520c9c 100644
--- a/internal/bench_pipeline_test.go
+++ b/internal/bench_pipeline_test.go
@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"path/filepath"
- "sync"
"testing"
"ior/internal/benchutil"
@@ -202,16 +201,6 @@ func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, n
streamBuf := eventstream.NewRingBuffer()
streamSeq := eventstream.NewSequencer(0)
liveTrie := flamegraph.NewLiveTrie([]string{"comm", "tracepoint", "path"}, "count")
- streamEvents := make(chan eventstream.StreamEvent, events)
-
- var streamWG sync.WaitGroup
- streamWG.Add(1)
- go func() {
- defer streamWG.Done()
- for row := range streamEvents {
- streamBuf.Push(row)
- }
- }()
recorder := parquet.NewRecorder(parquet.RecorderConfig{})
path := filepath.Join(dir, fmt.Sprintf("tui-%d.parquet", i))
@@ -225,7 +214,7 @@ func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, n
el.printCb = func(ep *event.Pair) {
row := eventstream.NewStreamEvent(streamSeq.Next(), ep)
engine.Ingest(ep)
- streamEvents <- row
+ streamBuf.Push(row)
if recordErr == nil {
recordErr = recorder.Record(row, 0)
}
@@ -237,8 +226,6 @@ func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, n
el.run(context.Background(), rawCh)
b.StopTimer()
- close(streamEvents)
- streamWG.Wait()
if recordErr != nil {
b.Fatalf("recorder.Record() error = %v", recordErr)
}
diff --git a/internal/ior.go b/internal/ior.go
index 1010445..d88f0e3 100644
--- a/internal/ior.go
+++ b/internal/ior.go
@@ -244,14 +244,6 @@ func tuiTraceStarterFromRunTrace(
bindings.SetEventStreamSource(streamSource)
bindings.SetLiveTrie(liveTrie)
}
- streamEvents := make(chan eventstream.StreamEvent, appconfig.DefaultChannelBufferSize)
-
- go func() {
- for ev := range streamEvents {
- streamBuf.Push(ev)
- }
- }()
-
startedCh := make(chan struct{})
errCh := make(chan error, 1)
@@ -264,7 +256,7 @@ func tuiTraceStarterFromRunTrace(
}
row := eventstream.NewStreamEvent(streamSeq.Next(), ep)
engine.Ingest(ep)
- streamEvents <- row
+ streamBuf.Push(row)
if recorder != nil {
if err := recorder.Record(row, filterEpoch); err != nil {
recorderWarningOnce.Do(func() {
@@ -280,14 +272,9 @@ func tuiTraceStarterFromRunTrace(
ep.Recycle()
}
el.warningCb = func(message string) {
- // Drop warning notifications if the stream channel is saturated.
- select {
- case streamEvents <- eventstream.NewWarningEvent(streamSeq.Next(), message):
- default:
- }
+ streamBuf.Push(eventstream.NewWarningEvent(streamSeq.Next(), message))
}
})
- close(streamEvents)
errCh <- err
close(errCh)
}()