diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-10 23:01:42 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-10 23:01:42 +0200 |
| commit | 27798b61d6ac4b14ea5129ac28131302a6c5cb30 (patch) | |
| tree | 198b623dd3ae68a4b61160ed3f48ffb4f118d17d | |
| parent | 064d967df9629acd16a5c1cd3a1007e22071f86c (diff) | |
tui: hide stream buffer behind source interface (task 428)
| -rw-r--r-- | internal/ior.go | 17 | ||||
| -rw-r--r-- | internal/tui/tui.go | 4 | ||||
| -rw-r--r-- | internal/tui/tui_test.go | 22 |
3 files changed, 34 insertions, 9 deletions
diff --git a/internal/ior.go b/internal/ior.go index 7b8f68a..8258f01 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -51,6 +51,11 @@ type libbpfTracepointModule struct { module *bpf.Module } +type streamEventSink interface { + eventstream.Source + Push(eventstream.StreamEvent) +} + func (m libbpfTracepointModule) GetProgram(progName string) (probemanager.Program, error) { prog, err := m.module.GetProgram(progName) if err != nil { @@ -180,14 +185,20 @@ func tuiTraceStarterFromRunTrace( applyTraceScopeFromGlobalFilter(&cfg, filter) } engine := statsengine.NewEngine(64) - streamBuf := eventstream.NewRingBuffer() + streamBuf := streamEventSink(eventstream.NewRingBuffer()) + streamSource := eventstream.Source(streamBuf) liveTrie := flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField) if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { if persistent := bindings.StreamBuffer(); persistent != nil { - streamBuf = persistent + streamSource = persistent + if sink, ok := persistent.(streamEventSink); ok { + streamBuf = sink + } else { + return fmt.Errorf("runtime stream source does not support event pushes") + } } bindings.SetDashboardSnapshotSource(engine) - bindings.SetEventStreamSource(streamBuf) + bindings.SetEventStreamSource(streamSource) bindings.SetLiveTrie(liveTrie) } streamEvents := make(chan eventstream.StreamEvent, 4096) diff --git a/internal/tui/tui.go b/internal/tui/tui.go index 4265cbb..71b5cf9 100644 --- a/internal/tui/tui.go +++ b/internal/tui/tui.go @@ -63,7 +63,7 @@ type TraceRuntimeBindings interface { SetEventStreamSource(source eventstream.Source) SetLiveTrie(liveTrie flamegraphtui.LiveTrieSource) SetProbeManager(manager ProbeManager) - StreamBuffer() *eventstream.RingBuffer + StreamBuffer() eventstream.Source } type runtimeBindingsContextKey struct{} @@ -103,7 +103,7 @@ func (r *runtimeBindings) SetEventStreamSource(source eventstream.Source) { r.mu.Unlock() } -func (r *runtimeBindings) StreamBuffer() *eventstream.RingBuffer { +func (r *runtimeBindings) StreamBuffer() eventstream.Source { r.mu.RLock() defer r.mu.RUnlock() return r.streamBuffer diff --git a/internal/tui/tui_test.go b/internal/tui/tui_test.go index ea2c6b0..361f69e 100644 --- a/internal/tui/tui_test.go +++ b/internal/tui/tui_test.go @@ -36,6 +36,20 @@ func (f fakeProbeManager) States() []probemanager.ProbeState { return f.states } func (f fakeProbeManager) Toggle(string) error { return nil } func (f fakeProbeManager) ActiveCount() (int, int) { return len(f.states), len(f.states) } +type testStreamSink interface { + eventstream.Source + Push(eventstream.StreamEvent) +} + +func requireTestStreamSink(t *testing.T, source eventstream.Source) testStreamSink { + t.Helper() + sink, ok := source.(testStreamSink) + if !ok { + t.Fatalf("expected stream source to support Push, got %T", source) + } + return sink +} + func TestTraceFiltersContextRoundTripClonesPayload(t *testing.T) { original := globalfilter.Filter{ Comm: &globalfilter.StringFilter{Pattern: "nginx"}, @@ -430,7 +444,7 @@ func TestRuntimeBindingsStoreAndExposeLiveTrie(t *testing.T) { func TestRuntimeBindingsProvidePersistentStreamBuffer(t *testing.T) { runtime := newRuntimeBindings() - buffer := runtime.StreamBuffer() + buffer := requireTestStreamSink(t, runtime.StreamBuffer()) if buffer == nil { t.Fatalf("expected persistent stream buffer") } @@ -504,7 +518,7 @@ func TestGlobalFilterApplyPreservesBufferedStreamRowsAcrossRestart(t *testing.T) m.width = 120 m.height = 30 - buffer := m.runtime.StreamBuffer() + buffer := requireTestStreamSink(t, m.runtime.StreamBuffer()) buffer.Push(eventstream.StreamEvent{Seq: 1, Syscall: "read", Comm: "proc", PID: 1, TID: 1, FileName: "/tmp/read"}) buffer.Push(eventstream.StreamEvent{Seq: 2, Syscall: "write", Comm: "proc", PID: 1, TID: 2, FileName: "/tmp/write"}) m.dashboard.SetStreamSource(buffer) @@ -840,7 +854,7 @@ func TestSelectPIDKeyReturnsToFreshPickerAndStopsTrace(t *testing.T) { func TestPidSelectedClearsPersistentStreamBuffer(t *testing.T) { m := NewModelWithConfig(flags.Config{PidFilter: -1, TidFilter: -1, TUIExportEnable: true}, -1, func(context.Context) error { return nil }) - m.runtime.StreamBuffer().Push(eventstream.StreamEvent{Seq: 1, Syscall: "read"}) + requireTestStreamSink(t, m.runtime.StreamBuffer()).Push(eventstream.StreamEvent{Seq: 1, Syscall: "read"}) next, _ := m.Update(PidSelectedMsg{Pid: 42}) m = next.(Model) @@ -1012,7 +1026,7 @@ func TestRunExportCmdCSVWritesFilteredStreamSnapshot(t *testing.T) { m.screen = ScreenDashboard m.attaching = false - buffer := m.runtime.StreamBuffer() + buffer := requireTestStreamSink(t, m.runtime.StreamBuffer()) buffer.Push(eventstream.StreamEvent{Seq: 1, Comm: "firefox", PID: 10, TID: 100, Syscall: "read", FileName: "/tmp/a"}) buffer.Push(eventstream.StreamEvent{Seq: 2, Comm: "bash", PID: 11, TID: 110, Syscall: "write", FileName: "/tmp/b"}) m.setGlobalFilter(globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "firefox"}}) |
