diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 07:14:00 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 07:14:00 +0200 |
| commit | c9d22e32dc9d8d0447beb4ffa78f47a03d0cddc4 (patch) | |
| tree | 18db76552b60a215a5ec9ef95a692aafc6af6564 /internal | |
| parent | 2e401326d7abf687f2f67537cfe1b7f93d548305 (diff) | |
feat: add headless parquet recording mode
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/flags/flags.go | 2 | ||||
| -rw-r--r-- | internal/flags/flags_test.go | 10 | ||||
| -rw-r--r-- | internal/ior.go | 169 | ||||
| -rw-r--r-- | internal/ior_mode_test.go | 120 |
4 files changed, 299 insertions, 2 deletions
diff --git a/internal/flags/flags.go b/internal/flags/flags.go index 9161378..fc42210 100644 --- a/internal/flags/flags.go +++ b/internal/flags/flags.go @@ -44,6 +44,7 @@ type Config struct { // Output/runtime flags PlainMode bool FlamegraphOutput bool + ParquetPath string OutputName string TestFlames bool TestLiveFlames bool @@ -170,6 +171,7 @@ func parse() error { flag.BoolVar(&cfg.PlainMode, "plain", false, "Enable plain CSV output mode (disable TUI)") flag.BoolVar(&cfg.FlamegraphOutput, "flamegraph", false, "Write aggregated .ior.zst output for trace/integration workflows") + flag.StringVar(&cfg.ParquetPath, "parquet", cfg.ParquetPath, "Write all traced syscall rows directly to a parquet file and skip the TUI") flag.StringVar(&cfg.OutputName, "name", cfg.OutputName, "Base name for .ior.zst trace output files") flag.BoolVar(&cfg.TestFlames, "testflames", false, "Run TUI with static synthetic flamegraph data for keyboard-navigation testing") flag.BoolVar(&cfg.TestLiveFlames, "testliveflames", false, "Run TUI with continuously-updating synthetic flamegraph data for live keyboard-navigation testing") diff --git a/internal/flags/flags_test.go b/internal/flags/flags_test.go index 4485f34..f5274d8 100644 --- a/internal/flags/flags_test.go +++ b/internal/flags/flags_test.go @@ -115,6 +115,16 @@ func TestParseFlamegraphOutputFlags(t *testing.T) { } } +func TestParseParquetOutputFlag(t *testing.T) { + cfg, err := parseForTest(t, "--parquet", "trace-run") + if err != nil { + t.Fatalf("parse returned error: %v", err) + } + if got, want := cfg.ParquetPath, "trace-run"; got != want { + t.Fatalf("parquet path = %q, want %q", got, want) + } +} + func TestParseDefaultCollapsedFieldsOrder(t *testing.T) { cfg, err := parseForTest(t) if err != nil { diff --git a/internal/ior.go b/internal/ior.go index 12aab7c..1010445 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -11,6 +11,7 @@ import ( "runtime" "runtime/pprof" "runtime/trace" + "strings" "sync" "syscall" "time" @@ -23,6 +24,7 @@ import ( "ior/internal/parquet" "ior/internal/probemanager" "ior/internal/statsengine" + "ior/internal/streamrow" "ior/internal/tracepoints" "ior/internal/tui" "ior/internal/tui/eventstream" @@ -32,6 +34,7 @@ import ( var ( runTraceFn = runTrace + runParquetFn = runHeadlessParquet runTraceWithContextFn = runTraceWithContext runTUIFn = tui.RunWithTraceStarterConfig runTUITestFlamesFn = tui.RunTestFlamesWithTraceStarterConfig @@ -82,6 +85,9 @@ func dispatchRun(cfg flags.Config) error { if cfg.TestLiveFlames { return runTUITestLiveFlamesFn(cfg, tuiTestLiveFlamesStarter(cfg)) } + if isHeadlessParquetMode(cfg) { + return runParquetFn(cfg) + } if shouldRunTraceMode(cfg) { return runTraceFn(cfg) } @@ -89,6 +95,23 @@ func dispatchRun(cfg flags.Config) error { } func validateRunConfig(cfg flags.Config) error { + if isHeadlessParquetMode(cfg) { + if cfg.TestFlames { + return errors.New("--testflames cannot be combined with -parquet") + } + if cfg.TestLiveFlames { + return errors.New("--testliveflames cannot be combined with -parquet") + } + if cfg.PlainMode { + return errors.New("-parquet and -plain are mutually exclusive") + } + if cfg.FlamegraphOutput { + return errors.New("-parquet and -flamegraph are mutually exclusive") + } + if hasHeadlessParquetContentFilters(cfg) { + return errors.New("-parquet cannot be combined with content filters (-comm, -path, -pid, -tid)") + } + } if cfg.TestFlames && cfg.PlainMode { return errors.New("--testflames cannot be combined with -plain") } @@ -178,7 +201,7 @@ func runSyntheticLiveFlames(ctx context.Context, liveTrie *flamegraph.LiveTrie, } func shouldRunTraceMode(cfg flags.Config) bool { - return cfg.PlainMode || cfg.FlamegraphOutput + return cfg.PlainMode || cfg.FlamegraphOutput || isHeadlessParquetMode(cfg) } func tuiTraceStarterFromRunTrace( @@ -312,6 +335,69 @@ func runTrace(cfg flags.Config) error { return runTraceWithContext(context.Background(), cfg, nil, nil) } +func runHeadlessParquet(cfg flags.Config) error { + if getEUID() != 0 { + return errRootPrivilegesRequired + } + + cfg = headlessParquetTraceConfig(cfg) + logln := newLogger(true) + + bpfModule, mgr, releaseBindings, err := setupBPFModule(context.Background(), cfg) + if err != nil { + return err + } + defer bpfModule.Close() + defer mgr.Close() + defer releaseBindings() + + ch, err := setupEventChannel(bpfModule) + if err != nil { + return err + } + ctx, cancel, stopSignals := setupTraceContext(context.Background(), cfg, logln) + defer cancel() + defer stopSignals() + + profiling, err := setupProfiling(ctx, cfg, nil) + if err != nil { + return err + } + + el, err := newEventLoop(newEventLoopConfig(cfg)) + if err != nil { + return err + } + + recorder := parquet.NewRecorder(parquet.RecorderConfig{}) + if err := recorder.Start(cfg.ParquetPath, parquet.StartOptions{Metadata: parquetMetadata("headless")}); err != nil { + return err + } + + sink := newHeadlessParquetSink(recorder, cancel) + configureEventLoopOutput(el, mgr, sink.configure) + startTraceShutdownWatcher(ctx, true, el, profiling, logln) + + startTime := time.Now() + el.run(ctx, ch) + totalDuration := time.Since(startTime) + <-profiling.done + + stopErr := recorder.Stop() + if err := sink.err(); err != nil { + if stopErr != nil && !errors.Is(stopErr, err) { + return errors.Join(err, stopErr) + } + return err + } + if stopErr != nil { + return stopErr + } + + logln("Good bye... (unloading BPF tracepoints will take a few seconds...) after", totalDuration) + return nil +} + func newEventLoopConfig(cfg flags.Config) eventLoopConfig { fields := make([]string, len(cfg.CollapsedFields)) copy(fields, cfg.CollapsedFields) @@ -630,7 +716,86 @@ func signalTraceStarted(started chan<- struct{}) { } func shouldAutoStopByDuration(cfg flags.Config) bool { - return cfg.PlainMode || cfg.FlamegraphOutput + return cfg.PlainMode || cfg.FlamegraphOutput || isHeadlessParquetMode(cfg) +} + +func isHeadlessParquetMode(cfg flags.Config) bool { + return strings.TrimSpace(cfg.ParquetPath) != "" +} + +func hasHeadlessParquetContentFilters(cfg flags.Config) bool { + return cfg.CommFilter != "" || + cfg.PathFilter != "" || + cfg.PidFilter > 0 || + cfg.TidFilter > 0 || + cfg.GlobalFilter.IsActive() +} + +func headlessParquetTraceConfig(cfg flags.Config) flags.Config { + out := cfg + out.PlainMode = false + out.FlamegraphOutput = false + out.CommFilter = "" + out.PathFilter = "" + out.PidFilter = -1 + out.TidFilter = -1 + out.GlobalFilter = globalfilter.Filter{} + return out +} + +func parquetMetadata(mode string) parquet.FileMetadata { + meta := parquet.FileMetadata{ + StartedAtUnixNano: uint64(time.Now().UnixNano()), + Mode: mode, + IORVersion: flags.Version, + } + if hostname, err := os.Hostname(); err == nil { + meta.Hostname = hostname + } + return meta +} + +type headlessParquetSink struct { + recorder *parquet.Recorder + seq *streamrow.Sequencer + cancel context.CancelFunc + + mu sync.Mutex + recErr error +} + +func newHeadlessParquetSink(recorder *parquet.Recorder, cancel context.CancelFunc) *headlessParquetSink { + return &headlessParquetSink{ + recorder: recorder, + seq: streamrow.NewSequencer(0), + cancel: cancel, + } +} + +func (s *headlessParquetSink) configure(el *eventLoop) { + el.printCb = func(ep *event.Pair) { + row := streamrow.New(s.seq.Next(), ep) + if err := s.recorder.Record(row, 0); err != nil { + s.fail(err) + } + ep.Recycle() + } +} + +func (s *headlessParquetSink) fail(err error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.recErr != nil { + return + } + s.recErr = err + s.cancel() +} + +func (s *headlessParquetSink) err() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.recErr } func profilingFilesForMode(tuiMode bool) (cpuProfilePath, memProfilePath, execTracePath string, execTraceDuration time.Duration) { diff --git a/internal/ior_mode_test.go b/internal/ior_mode_test.go index 08452bb..ced80fc 100644 --- a/internal/ior_mode_test.go +++ b/internal/ior_mode_test.go @@ -38,6 +38,12 @@ func TestShouldRunTraceMode(t *testing.T) { t.Fatalf("expected plain mode to use trace mode") } + withParquet := base + withParquet.ParquetPath = "trace.parquet" + if !shouldRunTraceMode(withParquet) { + t.Fatalf("expected parquet mode to use trace mode") + } + withPprof := base withPprof.PprofEnable = true if shouldRunTraceMode(withPprof) { @@ -69,6 +75,12 @@ func TestShouldAutoStopByDuration(t *testing.T) { t.Fatalf("expected plain mode to auto-stop by duration") } + withParquet := base + withParquet.ParquetPath = "trace.parquet" + if !shouldAutoStopByDuration(withParquet) { + t.Fatalf("expected parquet mode to auto-stop by duration") + } + withPprof := base withPprof.PprofEnable = true if shouldAutoStopByDuration(withPprof) { @@ -79,11 +91,13 @@ func TestShouldAutoStopByDuration(t *testing.T) { func TestDispatchRunUsesTraceModeWhenRequested(t *testing.T) { origRunTrace := runTraceFn + origRunParquet := runParquetFn origRunTUI := runTUIFn origRunTUITestFlames := runTUITestFlamesFn origRunTUITestLiveFlames := runTUITestLiveFlamesFn defer func() { runTraceFn = origRunTrace + runParquetFn = origRunParquet runTUIFn = origRunTUI runTUITestFlamesFn = origRunTUITestFlames runTUITestLiveFlamesFn = origRunTUITestLiveFlames @@ -95,6 +109,10 @@ func TestDispatchRunUsesTraceModeWhenRequested(t *testing.T) { traceCalled = true return nil } + runParquetFn = func(flags.Config) error { + t.Fatalf("runParquetFn should not be called in plain trace mode") + return nil + } runTUIFn = func(flags.Config, tui.TraceStarter) error { tuiCalled = true return nil @@ -120,6 +138,59 @@ func TestDispatchRunUsesTraceModeWhenRequested(t *testing.T) { } } +func TestDispatchRunUsesHeadlessParquetModeWhenRequested(t *testing.T) { + origRunTrace := runTraceFn + origRunParquet := runParquetFn + origRunTUI := runTUIFn + origRunTUITestFlames := runTUITestFlamesFn + origRunTUITestLiveFlames := runTUITestLiveFlamesFn + defer func() { + runTraceFn = origRunTrace + runParquetFn = origRunParquet + runTUIFn = origRunTUI + runTUITestFlamesFn = origRunTUITestFlames + runTUITestLiveFlamesFn = origRunTUITestLiveFlames + }() + + traceCalled := false + parquetCalled := false + tuiCalled := false + runTraceFn = func(flags.Config) error { + traceCalled = true + return nil + } + runParquetFn = func(flags.Config) error { + parquetCalled = true + return nil + } + runTUIFn = func(flags.Config, tui.TraceStarter) error { + tuiCalled = true + return nil + } + runTUITestFlamesFn = func(flags.Config, tui.TraceStarter) error { + t.Fatalf("runTUITestFlamesFn should not be called in parquet mode") + return nil + } + runTUITestLiveFlamesFn = func(flags.Config, tui.TraceStarter) error { + t.Fatalf("runTUITestLiveFlamesFn should not be called in parquet mode") + return nil + } + + cfg := flags.Config{ParquetPath: "trace.parquet"} + if err := dispatchRun(cfg); err != nil { + t.Fatalf("dispatchRun returned error: %v", err) + } + if !parquetCalled { + t.Fatalf("expected runParquetFn to be called") + } + if traceCalled { + t.Fatalf("did not expect runTraceFn to be called") + } + if tuiCalled { + t.Fatalf("did not expect runTUIFn to be called") + } +} + func TestDispatchRunUsesTUIWhenOnlyPprofEnabled(t *testing.T) { origRunTrace := runTraceFn origRunTUI := runTUIFn @@ -353,6 +424,31 @@ func TestValidateRunConfigRejectsBothTestModes(t *testing.T) { } } +func TestValidateRunConfigRejectsParquetWithPlain(t *testing.T) { + cfg := flags.Config{ParquetPath: "trace.parquet", PlainMode: true} + err := validateRunConfig(cfg) + if err == nil { + t.Fatalf("expected error for -parquet with -plain") + } + if err.Error() != "-parquet and -plain are mutually exclusive" { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestValidateRunConfigRejectsParquetWithContentFilters(t *testing.T) { + cfg := flags.Config{ + ParquetPath: "trace.parquet", + CommFilter: "nginx", + } + err := validateRunConfig(cfg) + if err == nil { + t.Fatalf("expected error for -parquet with content filters") + } + if err.Error() != "-parquet cannot be combined with content filters (-comm, -path, -pid, -tid)" { + t.Fatalf("unexpected error: %v", err) + } +} + func TestBuildTestFlamesRuntimeSeedsLiveTrie(t *testing.T) { cfg := flags.NewFlags() _, streamBuf, liveTrie := buildTestFlamesRuntime(cfg) @@ -561,6 +657,30 @@ func TestTuiTraceStarterFromRunTraceRespectsCancel(t *testing.T) { } } +func TestHeadlessParquetTraceConfigClearsContentFilters(t *testing.T) { + cfg := flags.Config{ + ParquetPath: "trace.parquet", + PidFilter: 1234, + TidFilter: 5678, + CommFilter: "nginx", + PathFilter: "/var/log", + GlobalFilter: globalfilter.Filter{ + Syscall: &globalfilter.StringFilter{Pattern: "read"}, + }, + } + + got := headlessParquetTraceConfig(cfg) + if got.PidFilter != -1 || got.TidFilter != -1 { + t.Fatalf("pid/tid filters = %d/%d, want -1/-1", got.PidFilter, got.TidFilter) + } + if got.CommFilter != "" || got.PathFilter != "" { + t.Fatalf("comm/path filters = %q/%q, want empty", got.CommFilter, got.PathFilter) + } + if got.GlobalFilter.IsActive() { + t.Fatalf("expected sanitized global filter to be empty, got %+v", got.GlobalFilter) + } +} + func TestTuiTraceStarterFromRunTracePersistsRecorderAcrossRestarts(t *testing.T) { recorder := parquet.NewRecorder(parquet.RecorderConfig{ BatchSize: 1, |
