summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-10 23:01:42 +0200
committerPaul Buetow <paul@buetow.org>2026-03-10 23:01:42 +0200
commit27798b61d6ac4b14ea5129ac28131302a6c5cb30 (patch)
tree198b623dd3ae68a4b61160ed3f48ffb4f118d17d
parent064d967df9629acd16a5c1cd3a1007e22071f86c (diff)
tui: hide stream buffer behind source interface (task 428)
-rw-r--r--internal/ior.go17
-rw-r--r--internal/tui/tui.go4
-rw-r--r--internal/tui/tui_test.go22
3 files changed, 34 insertions, 9 deletions
diff --git a/internal/ior.go b/internal/ior.go
index 7b8f68a..8258f01 100644
--- a/internal/ior.go
+++ b/internal/ior.go
@@ -51,6 +51,11 @@ type libbpfTracepointModule struct {
module *bpf.Module
}
+type streamEventSink interface {
+ eventstream.Source
+ Push(eventstream.StreamEvent)
+}
+
func (m libbpfTracepointModule) GetProgram(progName string) (probemanager.Program, error) {
prog, err := m.module.GetProgram(progName)
if err != nil {
@@ -180,14 +185,20 @@ func tuiTraceStarterFromRunTrace(
applyTraceScopeFromGlobalFilter(&cfg, filter)
}
engine := statsengine.NewEngine(64)
- streamBuf := eventstream.NewRingBuffer()
+ streamBuf := streamEventSink(eventstream.NewRingBuffer())
+ streamSource := eventstream.Source(streamBuf)
liveTrie := flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField)
if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok {
if persistent := bindings.StreamBuffer(); persistent != nil {
- streamBuf = persistent
+ streamSource = persistent
+ if sink, ok := persistent.(streamEventSink); ok {
+ streamBuf = sink
+ } else {
+ return fmt.Errorf("runtime stream source does not support event pushes")
+ }
}
bindings.SetDashboardSnapshotSource(engine)
- bindings.SetEventStreamSource(streamBuf)
+ bindings.SetEventStreamSource(streamSource)
bindings.SetLiveTrie(liveTrie)
}
streamEvents := make(chan eventstream.StreamEvent, 4096)
diff --git a/internal/tui/tui.go b/internal/tui/tui.go
index 4265cbb..71b5cf9 100644
--- a/internal/tui/tui.go
+++ b/internal/tui/tui.go
@@ -63,7 +63,7 @@ type TraceRuntimeBindings interface {
SetEventStreamSource(source eventstream.Source)
SetLiveTrie(liveTrie flamegraphtui.LiveTrieSource)
SetProbeManager(manager ProbeManager)
- StreamBuffer() *eventstream.RingBuffer
+ StreamBuffer() eventstream.Source
}
type runtimeBindingsContextKey struct{}
@@ -103,7 +103,7 @@ func (r *runtimeBindings) SetEventStreamSource(source eventstream.Source) {
r.mu.Unlock()
}
-func (r *runtimeBindings) StreamBuffer() *eventstream.RingBuffer {
+func (r *runtimeBindings) StreamBuffer() eventstream.Source {
r.mu.RLock()
defer r.mu.RUnlock()
return r.streamBuffer
diff --git a/internal/tui/tui_test.go b/internal/tui/tui_test.go
index ea2c6b0..361f69e 100644
--- a/internal/tui/tui_test.go
+++ b/internal/tui/tui_test.go
@@ -36,6 +36,20 @@ func (f fakeProbeManager) States() []probemanager.ProbeState { return f.states }
func (f fakeProbeManager) Toggle(string) error { return nil }
func (f fakeProbeManager) ActiveCount() (int, int) { return len(f.states), len(f.states) }
+type testStreamSink interface {
+ eventstream.Source
+ Push(eventstream.StreamEvent)
+}
+
+func requireTestStreamSink(t *testing.T, source eventstream.Source) testStreamSink {
+ t.Helper()
+ sink, ok := source.(testStreamSink)
+ if !ok {
+ t.Fatalf("expected stream source to support Push, got %T", source)
+ }
+ return sink
+}
+
func TestTraceFiltersContextRoundTripClonesPayload(t *testing.T) {
original := globalfilter.Filter{
Comm: &globalfilter.StringFilter{Pattern: "nginx"},
@@ -430,7 +444,7 @@ func TestRuntimeBindingsStoreAndExposeLiveTrie(t *testing.T) {
func TestRuntimeBindingsProvidePersistentStreamBuffer(t *testing.T) {
runtime := newRuntimeBindings()
- buffer := runtime.StreamBuffer()
+ buffer := requireTestStreamSink(t, runtime.StreamBuffer())
if buffer == nil {
t.Fatalf("expected persistent stream buffer")
}
@@ -504,7 +518,7 @@ func TestGlobalFilterApplyPreservesBufferedStreamRowsAcrossRestart(t *testing.T)
m.width = 120
m.height = 30
- buffer := m.runtime.StreamBuffer()
+ buffer := requireTestStreamSink(t, m.runtime.StreamBuffer())
buffer.Push(eventstream.StreamEvent{Seq: 1, Syscall: "read", Comm: "proc", PID: 1, TID: 1, FileName: "/tmp/read"})
buffer.Push(eventstream.StreamEvent{Seq: 2, Syscall: "write", Comm: "proc", PID: 1, TID: 2, FileName: "/tmp/write"})
m.dashboard.SetStreamSource(buffer)
@@ -840,7 +854,7 @@ func TestSelectPIDKeyReturnsToFreshPickerAndStopsTrace(t *testing.T) {
func TestPidSelectedClearsPersistentStreamBuffer(t *testing.T) {
m := NewModelWithConfig(flags.Config{PidFilter: -1, TidFilter: -1, TUIExportEnable: true}, -1, func(context.Context) error { return nil })
- m.runtime.StreamBuffer().Push(eventstream.StreamEvent{Seq: 1, Syscall: "read"})
+ requireTestStreamSink(t, m.runtime.StreamBuffer()).Push(eventstream.StreamEvent{Seq: 1, Syscall: "read"})
next, _ := m.Update(PidSelectedMsg{Pid: 42})
m = next.(Model)
@@ -1012,7 +1026,7 @@ func TestRunExportCmdCSVWritesFilteredStreamSnapshot(t *testing.T) {
m.screen = ScreenDashboard
m.attaching = false
- buffer := m.runtime.StreamBuffer()
+ buffer := requireTestStreamSink(t, m.runtime.StreamBuffer())
buffer.Push(eventstream.StreamEvent{Seq: 1, Comm: "firefox", PID: 10, TID: 100, Syscall: "read", FileName: "/tmp/a"})
buffer.Push(eventstream.StreamEvent{Seq: 2, Comm: "bash", PID: 11, TID: 110, Syscall: "write", FileName: "/tmp/b"})
m.setGlobalFilter(globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "firefox"}})