summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-12 23:54:44 +0200
committerPaul Buetow <paul@buetow.org>2026-03-12 23:54:44 +0200
commit2e401326d7abf687f2f67537cfe1b7f93d548305 (patch)
tree027547b0958d1ef1f236e507ae89dee414af204b
parent767c2b54779cbf81b68217c6e83868cffb6a0965 (diff)
feat: persist parquet recording across TUI restarts
-rw-r--r--internal/ior.go23
-rw-r--r--internal/ior_mode_test.go175
-rw-r--r--internal/tui/tui.go41
-rw-r--r--internal/tui/tui_test.go69
4 files changed, 304 insertions, 4 deletions
diff --git a/internal/ior.go b/internal/ior.go
index ea44fea..12aab7c 100644
--- a/internal/ior.go
+++ b/internal/ior.go
@@ -20,6 +20,7 @@ import (
"ior/internal/flags"
"ior/internal/flamegraph"
"ior/internal/globalfilter"
+ "ior/internal/parquet"
"ior/internal/probemanager"
"ior/internal/statsengine"
"ior/internal/tracepoints"
@@ -197,7 +198,11 @@ func tuiTraceStarterFromRunTrace(
engine := statsengine.NewEngine(64)
streamBuf := streamEventSink(eventstream.NewRingBuffer())
streamSource := eventstream.Source(streamBuf)
+ streamSeq := eventstream.NewSequencer(0)
liveTrie := flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField)
+ filterEpoch := uint64(0)
+ var recorderWarningOnce sync.Once
+ var recorder *parquet.Recorder
if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok {
if persistent := bindings.StreamBuffer(); persistent != nil {
streamSource = persistent
@@ -207,12 +212,16 @@ func tuiTraceStarterFromRunTrace(
return fmt.Errorf("runtime stream source does not support event pushes")
}
}
+ if persistentSeq := bindings.StreamSequencer(); persistentSeq != nil {
+ streamSeq = persistentSeq
+ }
+ recorder = bindings.Recorder()
+ filterEpoch = bindings.FilterEpoch()
bindings.SetDashboardSnapshotSource(engine)
bindings.SetEventStreamSource(streamSource)
bindings.SetLiveTrie(liveTrie)
}
streamEvents := make(chan eventstream.StreamEvent, appconfig.DefaultChannelBufferSize)
- streamSeq := eventstream.NewSequencer(0)
go func() {
for ev := range streamEvents {
@@ -230,8 +239,18 @@ func tuiTraceStarterFromRunTrace(
ep.Recycle()
return
}
+ row := eventstream.NewStreamEvent(streamSeq.Next(), ep)
engine.Ingest(ep)
- streamEvents <- eventstream.NewStreamEvent(streamSeq.Next(), ep)
+ streamEvents <- row
+ if recorder != nil {
+ if err := recorder.Record(row, filterEpoch); err != nil {
+ recorderWarningOnce.Do(func() {
+ if el.warningCb != nil {
+ el.warningCb(fmt.Sprintf("Parquet recorder failed: %v", err))
+ }
+ })
+ }
+ }
liveTrie.Ingest(ep)
// Both downstream consumers snapshot the pair synchronously, so
// the pooled pair can be recycled immediately afterwards.
diff --git a/internal/ior_mode_test.go b/internal/ior_mode_test.go
index 8fbc79c..08452bb 100644
--- a/internal/ior_mode_test.go
+++ b/internal/ior_mode_test.go
@@ -5,6 +5,9 @@ import (
"context"
"encoding/json"
"errors"
+ "io"
+ "os"
+ "path/filepath"
"testing"
"testing/synctest"
"time"
@@ -13,8 +16,13 @@ import (
"ior/internal/file"
"ior/internal/flags"
"ior/internal/globalfilter"
+ "ior/internal/parquet"
"ior/internal/tui"
+ "ior/internal/tui/eventstream"
+ flamegraphtui "ior/internal/tui/flamegraph"
"ior/internal/types"
+
+ parquetgo "github.com/parquet-go/parquet-go"
)
func TestShouldRunTraceMode(t *testing.T) {
@@ -552,3 +560,170 @@ func TestTuiTraceStarterFromRunTraceRespectsCancel(t *testing.T) {
t.Fatalf("expected context canceled, got %v", err)
}
}
+
+func TestTuiTraceStarterFromRunTracePersistsRecorderAcrossRestarts(t *testing.T) {
+ recorder := parquet.NewRecorder(parquet.RecorderConfig{
+ BatchSize: 1,
+ FlushInterval: time.Hour,
+ })
+ if err := recorder.Start(filepath.Join(t.TempDir(), "trace"), parquet.StartOptions{
+ Metadata: parquet.FileMetadata{Mode: "tui"},
+ }); err != nil {
+ t.Fatalf("recorder.Start() error = %v", err)
+ }
+
+ bindings := &traceRuntimeBindingsStub{
+ streamBuffer: eventstream.NewRingBuffer(),
+ streamSeq: eventstream.NewSequencer(0),
+ recorder: recorder,
+ }
+ base := flags.NewFlags()
+ base.GlobalFilter = globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "keep"}}
+
+ runs := [][]*event.Pair{
+ {
+ testTracePair(1, "keep"),
+ testTracePair(99, "drop"),
+ },
+ {
+ testTracePair(2, "keep"),
+ },
+ }
+ runIndex := 0
+ starter := tuiTraceStarterFromRunTrace(
+ base,
+ func(_ context.Context, _ flags.Config, started chan<- struct{}, configure func(*eventLoop)) error {
+ el := &eventLoop{}
+ configure(el)
+ for _, pair := range runs[runIndex] {
+ el.printCb(pair)
+ }
+ runIndex++
+ close(started)
+ return nil
+ },
+ )
+
+ ctx := tui.ContextWithRuntimeBindings(context.Background(), bindings)
+ if err := starter(ctx); err != nil {
+ t.Fatalf("first starter() error = %v", err)
+ }
+ waitForStreamRows(t, bindings.streamBuffer, 1)
+
+ bindings.filterEpoch = 1
+ if err := starter(ctx); err != nil {
+ t.Fatalf("second starter() error = %v", err)
+ }
+ waitForStreamRows(t, bindings.streamBuffer, 2)
+
+ if err := recorder.Stop(); err != nil {
+ t.Fatalf("recorder.Stop() error = %v", err)
+ }
+
+ status := recorder.Status()
+ if status.LastError != nil {
+ t.Fatalf("recorder status error = %v, want nil", status.LastError)
+ }
+
+ got := readRecordedParquet(t, status.Path)
+ if len(got) != 2 {
+ t.Fatalf("recorded rows = %d, want 2", len(got))
+ }
+ if got[0].Seq != 1 || got[1].Seq != 2 {
+ t.Fatalf("recorded seq = %d,%d, want 1,2", got[0].Seq, got[1].Seq)
+ }
+ if got[0].FilterEpoch != 0 || got[1].FilterEpoch != 1 {
+ t.Fatalf("recorded filter epochs = %d,%d, want 0,1", got[0].FilterEpoch, got[1].FilterEpoch)
+ }
+ if snapshot := bindings.streamBuffer.Snapshot(); len(snapshot) != 2 {
+ t.Fatalf("stream buffer rows = %d, want 2", len(snapshot))
+ }
+}
+
+type traceRuntimeBindingsStub struct {
+ streamBuffer *eventstream.RingBuffer
+ streamSource eventstream.Source
+ streamSeq *eventstream.Sequencer
+ recorder *parquet.Recorder
+ filterEpoch uint64
+}
+
+func (b *traceRuntimeBindingsStub) SetDashboardSnapshotSource(tui.SnapshotSource) {}
+
+func (b *traceRuntimeBindingsStub) SetEventStreamSource(source eventstream.Source) {
+ b.streamSource = source
+}
+
+func (b *traceRuntimeBindingsStub) SetLiveTrie(flamegraphtui.LiveTrieSource) {}
+
+func (b *traceRuntimeBindingsStub) SetProbeManager(tui.ProbeManager) {}
+
+func (b *traceRuntimeBindingsStub) StreamBuffer() eventstream.Source {
+ return b.streamBuffer
+}
+
+func (b *traceRuntimeBindingsStub) Recorder() *parquet.Recorder {
+ return b.recorder
+}
+
+func (b *traceRuntimeBindingsStub) StreamSequencer() *eventstream.Sequencer {
+ return b.streamSeq
+}
+
+func (b *traceRuntimeBindingsStub) FilterEpoch() uint64 {
+ return b.filterEpoch
+}
+
+func testTracePair(seq uint64, comm string) *event.Pair {
+ enter := &types.OpenEvent{TraceId: types.SYS_ENTER_OPENAT, Time: seq * 10, Pid: 42, Tid: 84}
+ exit := &types.RetEvent{TraceId: types.SYS_EXIT_OPENAT, Time: seq*10 + 1, Ret: int64(seq), Pid: 42, Tid: 84}
+ pair := event.NewPair(enter)
+ pair.ExitEv = exit
+ pair.File = file.NewFd(int32(seq), "/tmp/test", 0)
+ pair.Comm = comm
+ pair.Duration = seq
+ pair.DurationToPrev = seq + 1
+ pair.Bytes = seq + 2
+ return pair
+}
+
+func waitForStreamRows(t *testing.T, buffer *eventstream.RingBuffer, want int) {
+ t.Helper()
+ deadline := time.Now().Add(2 * time.Second)
+ for time.Now().Before(deadline) {
+ if buffer.Len() == want {
+ return
+ }
+ time.Sleep(time.Millisecond)
+ }
+ t.Fatalf("stream buffer len = %d, want %d", buffer.Len(), want)
+}
+
+func readRecordedParquet(t *testing.T, path string) []parquet.Record {
+ t.Helper()
+
+ f, err := os.Open(path)
+ if err != nil {
+ t.Fatalf("open parquet %q: %v", path, err)
+ }
+ defer f.Close()
+
+ reader := parquetgo.NewGenericReader[parquet.Record](f)
+ defer reader.Close()
+
+ var rows []parquet.Record
+ buf := make([]parquet.Record, 4)
+ for {
+ n, err := reader.Read(buf)
+ if n > 0 {
+ rows = append(rows, buf[:n]...)
+ }
+ if err == nil {
+ continue
+ }
+ if err == io.EOF {
+ return rows
+ }
+ t.Fatalf("read parquet rows: %v", err)
+ }
+}
diff --git a/internal/tui/tui.go b/internal/tui/tui.go
index 434e813..29d1fc8 100644
--- a/internal/tui/tui.go
+++ b/internal/tui/tui.go
@@ -8,10 +8,12 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
"ior/internal/flags"
"ior/internal/globalfilter"
+ "ior/internal/parquet"
"ior/internal/probemanager"
"ior/internal/statsengine"
common "ior/internal/tui/common"
@@ -64,6 +66,9 @@ type TraceRuntimeBindings interface {
SetLiveTrie(liveTrie flamegraphtui.LiveTrieSource)
SetProbeManager(manager ProbeManager)
StreamBuffer() eventstream.Source
+ Recorder() *parquet.Recorder
+ StreamSequencer() *eventstream.Sequencer
+ FilterEpoch() uint64
}
type runtimeBindingsContextKey struct{}
@@ -75,8 +80,11 @@ type runtimeBindings struct {
snapshotSource SnapshotSource
streamSource eventstream.Source
streamBuffer *eventstream.RingBuffer
+ streamSeq *eventstream.Sequencer
+ recorder *parquet.Recorder
liveTrieSource flamegraphtui.LiveTrieSource
probeManager ProbeManager
+ filterEpoch atomic.Uint64
}
type traceFilters struct {
@@ -88,6 +96,8 @@ func newRuntimeBindings() *runtimeBindings {
return &runtimeBindings{
streamSource: streamBuffer,
streamBuffer: streamBuffer,
+ streamSeq: eventstream.NewSequencer(0),
+ recorder: parquet.NewRecorder(parquet.RecorderConfig{}),
}
}
@@ -109,6 +119,22 @@ func (r *runtimeBindings) StreamBuffer() eventstream.Source {
return r.streamBuffer
}
+func (r *runtimeBindings) Recorder() *parquet.Recorder {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ return r.recorder
+}
+
+func (r *runtimeBindings) StreamSequencer() *eventstream.Sequencer {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ return r.streamSeq
+}
+
+func (r *runtimeBindings) FilterEpoch() uint64 {
+ return r.filterEpoch.Load()
+}
+
func (r *runtimeBindings) SetLiveTrie(liveTrie flamegraphtui.LiveTrieSource) {
r.mu.Lock()
r.liveTrieSource = liveTrie
@@ -155,6 +181,10 @@ func (r *runtimeBindings) resetStreamBuffer() {
r.streamSource = r.streamBuffer
}
+func (r *runtimeBindings) advanceFilterEpoch() uint64 {
+ return r.filterEpoch.Add(1)
+}
+
func (r *runtimeBindings) resetDashboardSnapshotSource() *statsengine.Snapshot {
src := r.dashboardSnapshotSource()
if src == nil {
@@ -173,13 +203,18 @@ func (r *runtimeBindings) resetDashboardSnapshotSource() *statsengine.Snapshot {
// RuntimeBindingsFromContext returns model-scoped trace bindings when the
// context was created by the TUI.
func RuntimeBindingsFromContext(ctx context.Context) (TraceRuntimeBindings, bool) {
- bindings, ok := ctx.Value(runtimeBindingsContextKey{}).(*runtimeBindings)
+ bindings, ok := ctx.Value(runtimeBindingsContextKey{}).(TraceRuntimeBindings)
if !ok || bindings == nil {
return nil, false
}
return bindings, true
}
+// ContextWithRuntimeBindings stores trace runtime bindings on the context.
+func ContextWithRuntimeBindings(ctx context.Context, bindings TraceRuntimeBindings) context.Context {
+ return context.WithValue(ctx, runtimeBindingsContextKey{}, bindings)
+}
+
// ContextWithTraceFilters stores the active trace filters for the trace starter.
func ContextWithTraceFilters(ctx context.Context, filter globalfilter.Filter) context.Context {
filters := traceFilters{filter: filter.Clone()}
@@ -724,7 +759,7 @@ func (m Model) cancelPickerToDashboard() (tea.Model, tea.Cmd) {
func (m *Model) beginTraceCmd() tea.Cmd {
ctx, cancel := context.WithCancel(context.Background())
m.traceStop = cancel
- ctx = context.WithValue(ctx, runtimeBindingsContextKey{}, m.runtime)
+ ctx = ContextWithRuntimeBindings(ctx, m.runtime)
ctx = ContextWithTraceFilters(ctx, m.globalFilter)
return startTraceCmd(m.startTrace, ctx)
}
@@ -816,6 +851,7 @@ func (m Model) applyGlobalFilter(filter globalfilter.Filter, action string) (tea
return m, nil
}
+ m.runtime.advanceFilterEpoch()
m.stopTrace()
m.dashboard.PrepareForTraceRestart()
m.attaching = true
@@ -837,6 +873,7 @@ func (m Model) undoGlobalFilter() (tea.Model, tea.Cmd) {
return m, nil
}
+ m.runtime.advanceFilterEpoch()
m.stopTrace()
m.dashboard.PrepareForTraceRestart()
m.attaching = true
diff --git a/internal/tui/tui_test.go b/internal/tui/tui_test.go
index 361f69e..70e2b5b 100644
--- a/internal/tui/tui_test.go
+++ b/internal/tui/tui_test.go
@@ -73,6 +73,19 @@ func TestTraceFiltersContextRoundTripClonesPayload(t *testing.T) {
}
}
+func TestRuntimeBindingsContextRoundTrip(t *testing.T) {
+ runtime := newRuntimeBindings()
+
+ ctx := ContextWithRuntimeBindings(context.Background(), runtime)
+ got, ok := RuntimeBindingsFromContext(ctx)
+ if !ok {
+ t.Fatalf("expected runtime bindings in context")
+ }
+ if got != runtime {
+ t.Fatalf("expected same runtime bindings instance from context")
+ }
+}
+
func TestPidSelectedTransitionsToDashboardAndSetsPIDFilter(t *testing.T) {
flags.SetPidFilter(-1)
flags.SetTidFilter(99)
@@ -466,6 +479,29 @@ func TestRuntimeBindingsProvidePersistentStreamBuffer(t *testing.T) {
}
}
+func TestRuntimeBindingsProvidePersistentRecorderAndSequencer(t *testing.T) {
+ runtime := newRuntimeBindings()
+
+ recorder := runtime.Recorder()
+ if recorder == nil {
+ t.Fatalf("expected persistent recorder")
+ }
+ if got := runtime.Recorder(); got != recorder {
+ t.Fatalf("expected recorder pointer to remain stable")
+ }
+
+ seq := runtime.StreamSequencer()
+ if seq == nil {
+ t.Fatalf("expected persistent stream sequencer")
+ }
+ if got := seq.Next(); got != 1 {
+ t.Fatalf("first persistent sequence = %d, want 1", got)
+ }
+ if got := runtime.StreamSequencer().Next(); got != 2 {
+ t.Fatalf("second persistent sequence = %d, want 2", got)
+ }
+}
+
func TestProbeToggledMsgResetsDashboardStatsSource(t *testing.T) {
src := &fakeResettableDashboardSource{snap: &statsengine.Snapshot{TotalSyscalls: 99}}
@@ -562,6 +598,39 @@ func TestGlobalFilterApplyPreservesBufferedStreamRowsAcrossRestart(t *testing.T)
}
}
+func TestGlobalFilterApplyAdvancesRuntimeFilterEpochAndKeepsRecorder(t *testing.T) {
+ m := NewModelWithConfig(flags.Config{PidFilter: -1, TidFilter: -1, TUIExportEnable: true}, -1, func(context.Context) error { return nil })
+ m.screen = ScreenDashboard
+ m.attaching = false
+
+ initialRecorder := m.runtime.Recorder()
+ if initialRecorder == nil {
+ t.Fatalf("expected runtime recorder")
+ }
+ if got := m.runtime.FilterEpoch(); got != 0 {
+ t.Fatalf("initial filter epoch = %d, want 0", got)
+ }
+
+ next, _ := m.Update(tea.KeyPressMsg{Code: []rune{'f'}[0], Text: string([]rune{'f'})})
+ m = next.(Model)
+ next, _ = m.Update(tea.KeyPressMsg{Code: tea.KeyEnter})
+ m = next.(Model)
+ next, _ = m.Update(tea.KeyPressMsg{Code: []rune("read")[0], Text: string([]rune("read"))})
+ m = next.(Model)
+ next, _ = m.Update(tea.KeyPressMsg{Code: tea.KeyEsc})
+ m = next.(Model)
+
+ if got := m.runtime.FilterEpoch(); got != 1 {
+ t.Fatalf("filter epoch after apply = %d, want 1", got)
+ }
+ if got := m.runtime.Recorder(); got != initialRecorder {
+ t.Fatalf("expected runtime recorder to survive filter restart")
+ }
+ if !m.attaching {
+ t.Fatalf("expected filter apply to restart tracing")
+ }
+}
+
func TestTracingStartedUsesCurrentViewportForFlameNavigationWithoutResize(t *testing.T) {
trie := coreflamegraph.NewLiveTrie([]string{"comm", "path", "tracepoint"}, "count")
coreflamegraph.SeedTestFlameData(trie)