diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-12 23:54:44 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-12 23:54:44 +0200 |
| commit | 2e401326d7abf687f2f67537cfe1b7f93d548305 (patch) | |
| tree | 027547b0958d1ef1f236e507ae89dee414af204b | |
| parent | 767c2b54779cbf81b68217c6e83868cffb6a0965 (diff) | |
feat: persist parquet recording across TUI restarts
| -rw-r--r-- | internal/ior.go | 23 | ||||
| -rw-r--r-- | internal/ior_mode_test.go | 175 | ||||
| -rw-r--r-- | internal/tui/tui.go | 41 | ||||
| -rw-r--r-- | internal/tui/tui_test.go | 69 |
4 files changed, 304 insertions, 4 deletions
diff --git a/internal/ior.go b/internal/ior.go index ea44fea..12aab7c 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -20,6 +20,7 @@ import ( "ior/internal/flags" "ior/internal/flamegraph" "ior/internal/globalfilter" + "ior/internal/parquet" "ior/internal/probemanager" "ior/internal/statsengine" "ior/internal/tracepoints" @@ -197,7 +198,11 @@ func tuiTraceStarterFromRunTrace( engine := statsengine.NewEngine(64) streamBuf := streamEventSink(eventstream.NewRingBuffer()) streamSource := eventstream.Source(streamBuf) + streamSeq := eventstream.NewSequencer(0) liveTrie := flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField) + filterEpoch := uint64(0) + var recorderWarningOnce sync.Once + var recorder *parquet.Recorder if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { if persistent := bindings.StreamBuffer(); persistent != nil { streamSource = persistent @@ -207,12 +212,16 @@ func tuiTraceStarterFromRunTrace( return fmt.Errorf("runtime stream source does not support event pushes") } } + if persistentSeq := bindings.StreamSequencer(); persistentSeq != nil { + streamSeq = persistentSeq + } + recorder = bindings.Recorder() + filterEpoch = bindings.FilterEpoch() bindings.SetDashboardSnapshotSource(engine) bindings.SetEventStreamSource(streamSource) bindings.SetLiveTrie(liveTrie) } streamEvents := make(chan eventstream.StreamEvent, appconfig.DefaultChannelBufferSize) - streamSeq := eventstream.NewSequencer(0) go func() { for ev := range streamEvents { @@ -230,8 +239,18 @@ func tuiTraceStarterFromRunTrace( ep.Recycle() return } + row := eventstream.NewStreamEvent(streamSeq.Next(), ep) engine.Ingest(ep) - streamEvents <- eventstream.NewStreamEvent(streamSeq.Next(), ep) + streamEvents <- row + if recorder != nil { + if err := recorder.Record(row, filterEpoch); err != nil { + recorderWarningOnce.Do(func() { + if el.warningCb != nil { + el.warningCb(fmt.Sprintf("Parquet recorder failed: %v", err)) + } + }) + } + } liveTrie.Ingest(ep) // Both downstream consumers snapshot the pair synchronously, so // the pooled pair can be recycled immediately afterwards. diff --git a/internal/ior_mode_test.go b/internal/ior_mode_test.go index 8fbc79c..08452bb 100644 --- a/internal/ior_mode_test.go +++ b/internal/ior_mode_test.go @@ -5,6 +5,9 @@ import ( "context" "encoding/json" "errors" + "io" + "os" + "path/filepath" "testing" "testing/synctest" "time" @@ -13,8 +16,13 @@ import ( "ior/internal/file" "ior/internal/flags" "ior/internal/globalfilter" + "ior/internal/parquet" "ior/internal/tui" + "ior/internal/tui/eventstream" + flamegraphtui "ior/internal/tui/flamegraph" "ior/internal/types" + + parquetgo "github.com/parquet-go/parquet-go" ) func TestShouldRunTraceMode(t *testing.T) { @@ -552,3 +560,170 @@ func TestTuiTraceStarterFromRunTraceRespectsCancel(t *testing.T) { t.Fatalf("expected context canceled, got %v", err) } } + +func TestTuiTraceStarterFromRunTracePersistsRecorderAcrossRestarts(t *testing.T) { + recorder := parquet.NewRecorder(parquet.RecorderConfig{ + BatchSize: 1, + FlushInterval: time.Hour, + }) + if err := recorder.Start(filepath.Join(t.TempDir(), "trace"), parquet.StartOptions{ + Metadata: parquet.FileMetadata{Mode: "tui"}, + }); err != nil { + t.Fatalf("recorder.Start() error = %v", err) + } + + bindings := &traceRuntimeBindingsStub{ + streamBuffer: eventstream.NewRingBuffer(), + streamSeq: eventstream.NewSequencer(0), + recorder: recorder, + } + base := flags.NewFlags() + base.GlobalFilter = globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "keep"}} + + runs := [][]*event.Pair{ + { + testTracePair(1, "keep"), + testTracePair(99, "drop"), + }, + { + testTracePair(2, "keep"), + }, + } + runIndex := 0 + starter := tuiTraceStarterFromRunTrace( + base, + func(_ context.Context, _ flags.Config, started chan<- struct{}, configure func(*eventLoop)) error { + el := &eventLoop{} + configure(el) + for _, pair := range runs[runIndex] { + el.printCb(pair) + } + runIndex++ + close(started) + return nil + }, + ) + + ctx := tui.ContextWithRuntimeBindings(context.Background(), bindings) + if err := starter(ctx); err != nil { + t.Fatalf("first starter() error = %v", err) + } + waitForStreamRows(t, bindings.streamBuffer, 1) + + bindings.filterEpoch = 1 + if err := starter(ctx); err != nil { + t.Fatalf("second starter() error = %v", err) + } + waitForStreamRows(t, bindings.streamBuffer, 2) + + if err := recorder.Stop(); err != nil { + t.Fatalf("recorder.Stop() error = %v", err) + } + + status := recorder.Status() + if status.LastError != nil { + t.Fatalf("recorder status error = %v, want nil", status.LastError) + } + + got := readRecordedParquet(t, status.Path) + if len(got) != 2 { + t.Fatalf("recorded rows = %d, want 2", len(got)) + } + if got[0].Seq != 1 || got[1].Seq != 2 { + t.Fatalf("recorded seq = %d,%d, want 1,2", got[0].Seq, got[1].Seq) + } + if got[0].FilterEpoch != 0 || got[1].FilterEpoch != 1 { + t.Fatalf("recorded filter epochs = %d,%d, want 0,1", got[0].FilterEpoch, got[1].FilterEpoch) + } + if snapshot := bindings.streamBuffer.Snapshot(); len(snapshot) != 2 { + t.Fatalf("stream buffer rows = %d, want 2", len(snapshot)) + } +} + +type traceRuntimeBindingsStub struct { + streamBuffer *eventstream.RingBuffer + streamSource eventstream.Source + streamSeq *eventstream.Sequencer + recorder *parquet.Recorder + filterEpoch uint64 +} + +func (b *traceRuntimeBindingsStub) SetDashboardSnapshotSource(tui.SnapshotSource) {} + +func (b *traceRuntimeBindingsStub) SetEventStreamSource(source eventstream.Source) { + b.streamSource = source +} + +func (b *traceRuntimeBindingsStub) SetLiveTrie(flamegraphtui.LiveTrieSource) {} + +func (b *traceRuntimeBindingsStub) SetProbeManager(tui.ProbeManager) {} + +func (b *traceRuntimeBindingsStub) StreamBuffer() eventstream.Source { + return b.streamBuffer +} + +func (b *traceRuntimeBindingsStub) Recorder() *parquet.Recorder { + return b.recorder +} + +func (b *traceRuntimeBindingsStub) StreamSequencer() *eventstream.Sequencer { + return b.streamSeq +} + +func (b *traceRuntimeBindingsStub) FilterEpoch() uint64 { + return b.filterEpoch +} + +func testTracePair(seq uint64, comm string) *event.Pair { + enter := &types.OpenEvent{TraceId: types.SYS_ENTER_OPENAT, Time: seq * 10, Pid: 42, Tid: 84} + exit := &types.RetEvent{TraceId: types.SYS_EXIT_OPENAT, Time: seq*10 + 1, Ret: int64(seq), Pid: 42, Tid: 84} + pair := event.NewPair(enter) + pair.ExitEv = exit + pair.File = file.NewFd(int32(seq), "/tmp/test", 0) + pair.Comm = comm + pair.Duration = seq + pair.DurationToPrev = seq + 1 + pair.Bytes = seq + 2 + return pair +} + +func waitForStreamRows(t *testing.T, buffer *eventstream.RingBuffer, want int) { + t.Helper() + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if buffer.Len() == want { + return + } + time.Sleep(time.Millisecond) + } + t.Fatalf("stream buffer len = %d, want %d", buffer.Len(), want) +} + +func readRecordedParquet(t *testing.T, path string) []parquet.Record { + t.Helper() + + f, err := os.Open(path) + if err != nil { + t.Fatalf("open parquet %q: %v", path, err) + } + defer f.Close() + + reader := parquetgo.NewGenericReader[parquet.Record](f) + defer reader.Close() + + var rows []parquet.Record + buf := make([]parquet.Record, 4) + for { + n, err := reader.Read(buf) + if n > 0 { + rows = append(rows, buf[:n]...) + } + if err == nil { + continue + } + if err == io.EOF { + return rows + } + t.Fatalf("read parquet rows: %v", err) + } +} diff --git a/internal/tui/tui.go b/internal/tui/tui.go index 434e813..29d1fc8 100644 --- a/internal/tui/tui.go +++ b/internal/tui/tui.go @@ -8,10 +8,12 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "ior/internal/flags" "ior/internal/globalfilter" + "ior/internal/parquet" "ior/internal/probemanager" "ior/internal/statsengine" common "ior/internal/tui/common" @@ -64,6 +66,9 @@ type TraceRuntimeBindings interface { SetLiveTrie(liveTrie flamegraphtui.LiveTrieSource) SetProbeManager(manager ProbeManager) StreamBuffer() eventstream.Source + Recorder() *parquet.Recorder + StreamSequencer() *eventstream.Sequencer + FilterEpoch() uint64 } type runtimeBindingsContextKey struct{} @@ -75,8 +80,11 @@ type runtimeBindings struct { snapshotSource SnapshotSource streamSource eventstream.Source streamBuffer *eventstream.RingBuffer + streamSeq *eventstream.Sequencer + recorder *parquet.Recorder liveTrieSource flamegraphtui.LiveTrieSource probeManager ProbeManager + filterEpoch atomic.Uint64 } type traceFilters struct { @@ -88,6 +96,8 @@ func newRuntimeBindings() *runtimeBindings { return &runtimeBindings{ streamSource: streamBuffer, streamBuffer: streamBuffer, + streamSeq: eventstream.NewSequencer(0), + recorder: parquet.NewRecorder(parquet.RecorderConfig{}), } } @@ -109,6 +119,22 @@ func (r *runtimeBindings) StreamBuffer() eventstream.Source { return r.streamBuffer } +func (r *runtimeBindings) Recorder() *parquet.Recorder { + r.mu.RLock() + defer r.mu.RUnlock() + return r.recorder +} + +func (r *runtimeBindings) StreamSequencer() *eventstream.Sequencer { + r.mu.RLock() + defer r.mu.RUnlock() + return r.streamSeq +} + +func (r *runtimeBindings) FilterEpoch() uint64 { + return r.filterEpoch.Load() +} + func (r *runtimeBindings) SetLiveTrie(liveTrie flamegraphtui.LiveTrieSource) { r.mu.Lock() r.liveTrieSource = liveTrie @@ -155,6 +181,10 @@ func (r *runtimeBindings) resetStreamBuffer() { r.streamSource = r.streamBuffer } +func (r *runtimeBindings) advanceFilterEpoch() uint64 { + return r.filterEpoch.Add(1) +} + func (r *runtimeBindings) resetDashboardSnapshotSource() *statsengine.Snapshot { src := r.dashboardSnapshotSource() if src == nil { @@ -173,13 +203,18 @@ func (r *runtimeBindings) resetDashboardSnapshotSource() *statsengine.Snapshot { // RuntimeBindingsFromContext returns model-scoped trace bindings when the // context was created by the TUI. func RuntimeBindingsFromContext(ctx context.Context) (TraceRuntimeBindings, bool) { - bindings, ok := ctx.Value(runtimeBindingsContextKey{}).(*runtimeBindings) + bindings, ok := ctx.Value(runtimeBindingsContextKey{}).(TraceRuntimeBindings) if !ok || bindings == nil { return nil, false } return bindings, true } +// ContextWithRuntimeBindings stores trace runtime bindings on the context. +func ContextWithRuntimeBindings(ctx context.Context, bindings TraceRuntimeBindings) context.Context { + return context.WithValue(ctx, runtimeBindingsContextKey{}, bindings) +} + // ContextWithTraceFilters stores the active trace filters for the trace starter. func ContextWithTraceFilters(ctx context.Context, filter globalfilter.Filter) context.Context { filters := traceFilters{filter: filter.Clone()} @@ -724,7 +759,7 @@ func (m Model) cancelPickerToDashboard() (tea.Model, tea.Cmd) { func (m *Model) beginTraceCmd() tea.Cmd { ctx, cancel := context.WithCancel(context.Background()) m.traceStop = cancel - ctx = context.WithValue(ctx, runtimeBindingsContextKey{}, m.runtime) + ctx = ContextWithRuntimeBindings(ctx, m.runtime) ctx = ContextWithTraceFilters(ctx, m.globalFilter) return startTraceCmd(m.startTrace, ctx) } @@ -816,6 +851,7 @@ func (m Model) applyGlobalFilter(filter globalfilter.Filter, action string) (tea return m, nil } + m.runtime.advanceFilterEpoch() m.stopTrace() m.dashboard.PrepareForTraceRestart() m.attaching = true @@ -837,6 +873,7 @@ func (m Model) undoGlobalFilter() (tea.Model, tea.Cmd) { return m, nil } + m.runtime.advanceFilterEpoch() m.stopTrace() m.dashboard.PrepareForTraceRestart() m.attaching = true diff --git a/internal/tui/tui_test.go b/internal/tui/tui_test.go index 361f69e..70e2b5b 100644 --- a/internal/tui/tui_test.go +++ b/internal/tui/tui_test.go @@ -73,6 +73,19 @@ func TestTraceFiltersContextRoundTripClonesPayload(t *testing.T) { } } +func TestRuntimeBindingsContextRoundTrip(t *testing.T) { + runtime := newRuntimeBindings() + + ctx := ContextWithRuntimeBindings(context.Background(), runtime) + got, ok := RuntimeBindingsFromContext(ctx) + if !ok { + t.Fatalf("expected runtime bindings in context") + } + if got != runtime { + t.Fatalf("expected same runtime bindings instance from context") + } +} + func TestPidSelectedTransitionsToDashboardAndSetsPIDFilter(t *testing.T) { flags.SetPidFilter(-1) flags.SetTidFilter(99) @@ -466,6 +479,29 @@ func TestRuntimeBindingsProvidePersistentStreamBuffer(t *testing.T) { } } +func TestRuntimeBindingsProvidePersistentRecorderAndSequencer(t *testing.T) { + runtime := newRuntimeBindings() + + recorder := runtime.Recorder() + if recorder == nil { + t.Fatalf("expected persistent recorder") + } + if got := runtime.Recorder(); got != recorder { + t.Fatalf("expected recorder pointer to remain stable") + } + + seq := runtime.StreamSequencer() + if seq == nil { + t.Fatalf("expected persistent stream sequencer") + } + if got := seq.Next(); got != 1 { + t.Fatalf("first persistent sequence = %d, want 1", got) + } + if got := runtime.StreamSequencer().Next(); got != 2 { + t.Fatalf("second persistent sequence = %d, want 2", got) + } +} + func TestProbeToggledMsgResetsDashboardStatsSource(t *testing.T) { src := &fakeResettableDashboardSource{snap: &statsengine.Snapshot{TotalSyscalls: 99}} @@ -562,6 +598,39 @@ func TestGlobalFilterApplyPreservesBufferedStreamRowsAcrossRestart(t *testing.T) } } +func TestGlobalFilterApplyAdvancesRuntimeFilterEpochAndKeepsRecorder(t *testing.T) { + m := NewModelWithConfig(flags.Config{PidFilter: -1, TidFilter: -1, TUIExportEnable: true}, -1, func(context.Context) error { return nil }) + m.screen = ScreenDashboard + m.attaching = false + + initialRecorder := m.runtime.Recorder() + if initialRecorder == nil { + t.Fatalf("expected runtime recorder") + } + if got := m.runtime.FilterEpoch(); got != 0 { + t.Fatalf("initial filter epoch = %d, want 0", got) + } + + next, _ := m.Update(tea.KeyPressMsg{Code: []rune{'f'}[0], Text: string([]rune{'f'})}) + m = next.(Model) + next, _ = m.Update(tea.KeyPressMsg{Code: tea.KeyEnter}) + m = next.(Model) + next, _ = m.Update(tea.KeyPressMsg{Code: []rune("read")[0], Text: string([]rune("read"))}) + m = next.(Model) + next, _ = m.Update(tea.KeyPressMsg{Code: tea.KeyEsc}) + m = next.(Model) + + if got := m.runtime.FilterEpoch(); got != 1 { + t.Fatalf("filter epoch after apply = %d, want 1", got) + } + if got := m.runtime.Recorder(); got != initialRecorder { + t.Fatalf("expected runtime recorder to survive filter restart") + } + if !m.attaching { + t.Fatalf("expected filter apply to restart tracing") + } +} + func TestTracingStartedUsesCurrentViewportForFlameNavigationWithoutResize(t *testing.T) { trie := coreflamegraph.NewLiveTrie([]string{"comm", "path", "tracepoint"}, "count") coreflamegraph.SeedTestFlameData(trie) |
