summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 07:14:00 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 07:14:00 +0200
commitc9d22e32dc9d8d0447beb4ffa78f47a03d0cddc4 (patch)
tree18db76552b60a215a5ec9ef95a692aafc6af6564 /internal
parent2e401326d7abf687f2f67537cfe1b7f93d548305 (diff)
feat: add headless parquet recording mode
Diffstat (limited to 'internal')
-rw-r--r--internal/flags/flags.go2
-rw-r--r--internal/flags/flags_test.go10
-rw-r--r--internal/ior.go169
-rw-r--r--internal/ior_mode_test.go120
4 files changed, 299 insertions, 2 deletions
diff --git a/internal/flags/flags.go b/internal/flags/flags.go
index 9161378..fc42210 100644
--- a/internal/flags/flags.go
+++ b/internal/flags/flags.go
@@ -44,6 +44,7 @@ type Config struct {
// Output/runtime flags
PlainMode bool
FlamegraphOutput bool
+ ParquetPath string
OutputName string
TestFlames bool
TestLiveFlames bool
@@ -170,6 +171,7 @@ func parse() error {
flag.BoolVar(&cfg.PlainMode, "plain", false, "Enable plain CSV output mode (disable TUI)")
flag.BoolVar(&cfg.FlamegraphOutput, "flamegraph", false, "Write aggregated .ior.zst output for trace/integration workflows")
+ flag.StringVar(&cfg.ParquetPath, "parquet", cfg.ParquetPath, "Write all traced syscall rows directly to a parquet file and skip the TUI")
flag.StringVar(&cfg.OutputName, "name", cfg.OutputName, "Base name for .ior.zst trace output files")
flag.BoolVar(&cfg.TestFlames, "testflames", false, "Run TUI with static synthetic flamegraph data for keyboard-navigation testing")
flag.BoolVar(&cfg.TestLiveFlames, "testliveflames", false, "Run TUI with continuously-updating synthetic flamegraph data for live keyboard-navigation testing")
diff --git a/internal/flags/flags_test.go b/internal/flags/flags_test.go
index 4485f34..f5274d8 100644
--- a/internal/flags/flags_test.go
+++ b/internal/flags/flags_test.go
@@ -115,6 +115,16 @@ func TestParseFlamegraphOutputFlags(t *testing.T) {
}
}
+func TestParseParquetOutputFlag(t *testing.T) {
+ cfg, err := parseForTest(t, "--parquet", "trace-run")
+ if err != nil {
+ t.Fatalf("parse returned error: %v", err)
+ }
+ if got, want := cfg.ParquetPath, "trace-run"; got != want {
+ t.Fatalf("parquet path = %q, want %q", got, want)
+ }
+}
+
func TestParseDefaultCollapsedFieldsOrder(t *testing.T) {
cfg, err := parseForTest(t)
if err != nil {
diff --git a/internal/ior.go b/internal/ior.go
index 12aab7c..1010445 100644
--- a/internal/ior.go
+++ b/internal/ior.go
@@ -11,6 +11,7 @@ import (
"runtime"
"runtime/pprof"
"runtime/trace"
+ "strings"
"sync"
"syscall"
"time"
@@ -23,6 +24,7 @@ import (
"ior/internal/parquet"
"ior/internal/probemanager"
"ior/internal/statsengine"
+ "ior/internal/streamrow"
"ior/internal/tracepoints"
"ior/internal/tui"
"ior/internal/tui/eventstream"
@@ -32,6 +34,7 @@ import (
var (
runTraceFn = runTrace
+ runParquetFn = runHeadlessParquet
runTraceWithContextFn = runTraceWithContext
runTUIFn = tui.RunWithTraceStarterConfig
runTUITestFlamesFn = tui.RunTestFlamesWithTraceStarterConfig
@@ -82,6 +85,9 @@ func dispatchRun(cfg flags.Config) error {
if cfg.TestLiveFlames {
return runTUITestLiveFlamesFn(cfg, tuiTestLiveFlamesStarter(cfg))
}
+ if isHeadlessParquetMode(cfg) {
+ return runParquetFn(cfg)
+ }
if shouldRunTraceMode(cfg) {
return runTraceFn(cfg)
}
@@ -89,6 +95,23 @@ func dispatchRun(cfg flags.Config) error {
}
func validateRunConfig(cfg flags.Config) error {
+ if isHeadlessParquetMode(cfg) {
+ if cfg.TestFlames {
+ return errors.New("--testflames cannot be combined with -parquet")
+ }
+ if cfg.TestLiveFlames {
+ return errors.New("--testliveflames cannot be combined with -parquet")
+ }
+ if cfg.PlainMode {
+ return errors.New("-parquet and -plain are mutually exclusive")
+ }
+ if cfg.FlamegraphOutput {
+ return errors.New("-parquet and -flamegraph are mutually exclusive")
+ }
+ if hasHeadlessParquetContentFilters(cfg) {
+ return errors.New("-parquet cannot be combined with content filters (-comm, -path, -pid, -tid)")
+ }
+ }
if cfg.TestFlames && cfg.PlainMode {
return errors.New("--testflames cannot be combined with -plain")
}
@@ -178,7 +201,7 @@ func runSyntheticLiveFlames(ctx context.Context, liveTrie *flamegraph.LiveTrie,
}
func shouldRunTraceMode(cfg flags.Config) bool {
- return cfg.PlainMode || cfg.FlamegraphOutput
+ return cfg.PlainMode || cfg.FlamegraphOutput || isHeadlessParquetMode(cfg)
}
func tuiTraceStarterFromRunTrace(
@@ -312,6 +335,69 @@ func runTrace(cfg flags.Config) error {
return runTraceWithContext(context.Background(), cfg, nil, nil)
}
+func runHeadlessParquet(cfg flags.Config) error {
+ if getEUID() != 0 {
+ return errRootPrivilegesRequired
+ }
+
+ cfg = headlessParquetTraceConfig(cfg)
+ logln := newLogger(true)
+
+ bpfModule, mgr, releaseBindings, err := setupBPFModule(context.Background(), cfg)
+ if err != nil {
+ return err
+ }
+ defer bpfModule.Close()
+ defer mgr.Close()
+ defer releaseBindings()
+
+ ch, err := setupEventChannel(bpfModule)
+ if err != nil {
+ return err
+ }
+ ctx, cancel, stopSignals := setupTraceContext(context.Background(), cfg, logln)
+ defer cancel()
+ defer stopSignals()
+
+ profiling, err := setupProfiling(ctx, cfg, nil)
+ if err != nil {
+ return err
+ }
+
+ el, err := newEventLoop(newEventLoopConfig(cfg))
+ if err != nil {
+ return err
+ }
+
+ recorder := parquet.NewRecorder(parquet.RecorderConfig{})
+ if err := recorder.Start(cfg.ParquetPath, parquet.StartOptions{Metadata: parquetMetadata("headless")}); err != nil {
+ return err
+ }
+
+ sink := newHeadlessParquetSink(recorder, cancel)
+ configureEventLoopOutput(el, mgr, sink.configure)
+ startTraceShutdownWatcher(ctx, true, el, profiling, logln)
+
+ startTime := time.Now()
+ el.run(ctx, ch)
+ totalDuration := time.Since(startTime)
+ <-profiling.done
+
+ stopErr := recorder.Stop()
+ if err := sink.err(); err != nil {
+ if stopErr != nil && !errors.Is(stopErr, err) {
+ return errors.Join(err, stopErr)
+ }
+ return err
+ }
+ if stopErr != nil {
+ return stopErr
+ }
+
+ logln("Good bye... (unloading BPF tracepoints will take a few seconds...) after", totalDuration)
+ return nil
+}
+
func newEventLoopConfig(cfg flags.Config) eventLoopConfig {
fields := make([]string, len(cfg.CollapsedFields))
copy(fields, cfg.CollapsedFields)
@@ -630,7 +716,86 @@ func signalTraceStarted(started chan<- struct{}) {
}
func shouldAutoStopByDuration(cfg flags.Config) bool {
- return cfg.PlainMode || cfg.FlamegraphOutput
+ return cfg.PlainMode || cfg.FlamegraphOutput || isHeadlessParquetMode(cfg)
+}
+
+func isHeadlessParquetMode(cfg flags.Config) bool {
+ return strings.TrimSpace(cfg.ParquetPath) != ""
+}
+
+func hasHeadlessParquetContentFilters(cfg flags.Config) bool {
+ return cfg.CommFilter != "" ||
+ cfg.PathFilter != "" ||
+ cfg.PidFilter > 0 ||
+ cfg.TidFilter > 0 ||
+ cfg.GlobalFilter.IsActive()
+}
+
+func headlessParquetTraceConfig(cfg flags.Config) flags.Config {
+ out := cfg
+ out.PlainMode = false
+ out.FlamegraphOutput = false
+ out.CommFilter = ""
+ out.PathFilter = ""
+ out.PidFilter = -1
+ out.TidFilter = -1
+ out.GlobalFilter = globalfilter.Filter{}
+ return out
+}
+
+func parquetMetadata(mode string) parquet.FileMetadata {
+ meta := parquet.FileMetadata{
+ StartedAtUnixNano: uint64(time.Now().UnixNano()),
+ Mode: mode,
+ IORVersion: flags.Version,
+ }
+ if hostname, err := os.Hostname(); err == nil {
+ meta.Hostname = hostname
+ }
+ return meta
+}
+
+type headlessParquetSink struct {
+ recorder *parquet.Recorder
+ seq *streamrow.Sequencer
+ cancel context.CancelFunc
+
+ mu sync.Mutex
+ recErr error
+}
+
+func newHeadlessParquetSink(recorder *parquet.Recorder, cancel context.CancelFunc) *headlessParquetSink {
+ return &headlessParquetSink{
+ recorder: recorder,
+ seq: streamrow.NewSequencer(0),
+ cancel: cancel,
+ }
+}
+
+func (s *headlessParquetSink) configure(el *eventLoop) {
+ el.printCb = func(ep *event.Pair) {
+ row := streamrow.New(s.seq.Next(), ep)
+ if err := s.recorder.Record(row, 0); err != nil {
+ s.fail(err)
+ }
+ ep.Recycle()
+ }
+}
+
+func (s *headlessParquetSink) fail(err error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.recErr != nil {
+ return
+ }
+ s.recErr = err
+ s.cancel()
+}
+
+func (s *headlessParquetSink) err() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.recErr
}
func profilingFilesForMode(tuiMode bool) (cpuProfilePath, memProfilePath, execTracePath string, execTraceDuration time.Duration) {
diff --git a/internal/ior_mode_test.go b/internal/ior_mode_test.go
index 08452bb..ced80fc 100644
--- a/internal/ior_mode_test.go
+++ b/internal/ior_mode_test.go
@@ -38,6 +38,12 @@ func TestShouldRunTraceMode(t *testing.T) {
t.Fatalf("expected plain mode to use trace mode")
}
+ withParquet := base
+ withParquet.ParquetPath = "trace.parquet"
+ if !shouldRunTraceMode(withParquet) {
+ t.Fatalf("expected parquet mode to use trace mode")
+ }
+
withPprof := base
withPprof.PprofEnable = true
if shouldRunTraceMode(withPprof) {
@@ -69,6 +75,12 @@ func TestShouldAutoStopByDuration(t *testing.T) {
t.Fatalf("expected plain mode to auto-stop by duration")
}
+ withParquet := base
+ withParquet.ParquetPath = "trace.parquet"
+ if !shouldAutoStopByDuration(withParquet) {
+ t.Fatalf("expected parquet mode to auto-stop by duration")
+ }
+
withPprof := base
withPprof.PprofEnable = true
if shouldAutoStopByDuration(withPprof) {
@@ -79,11 +91,13 @@ func TestShouldAutoStopByDuration(t *testing.T) {
func TestDispatchRunUsesTraceModeWhenRequested(t *testing.T) {
origRunTrace := runTraceFn
+ origRunParquet := runParquetFn
origRunTUI := runTUIFn
origRunTUITestFlames := runTUITestFlamesFn
origRunTUITestLiveFlames := runTUITestLiveFlamesFn
defer func() {
runTraceFn = origRunTrace
+ runParquetFn = origRunParquet
runTUIFn = origRunTUI
runTUITestFlamesFn = origRunTUITestFlames
runTUITestLiveFlamesFn = origRunTUITestLiveFlames
@@ -95,6 +109,10 @@ func TestDispatchRunUsesTraceModeWhenRequested(t *testing.T) {
traceCalled = true
return nil
}
+ runParquetFn = func(flags.Config) error {
+ t.Fatalf("runParquetFn should not be called in plain trace mode")
+ return nil
+ }
runTUIFn = func(flags.Config, tui.TraceStarter) error {
tuiCalled = true
return nil
@@ -120,6 +138,59 @@ func TestDispatchRunUsesTraceModeWhenRequested(t *testing.T) {
}
}
+func TestDispatchRunUsesHeadlessParquetModeWhenRequested(t *testing.T) {
+ origRunTrace := runTraceFn
+ origRunParquet := runParquetFn
+ origRunTUI := runTUIFn
+ origRunTUITestFlames := runTUITestFlamesFn
+ origRunTUITestLiveFlames := runTUITestLiveFlamesFn
+ defer func() {
+ runTraceFn = origRunTrace
+ runParquetFn = origRunParquet
+ runTUIFn = origRunTUI
+ runTUITestFlamesFn = origRunTUITestFlames
+ runTUITestLiveFlamesFn = origRunTUITestLiveFlames
+ }()
+
+ traceCalled := false
+ parquetCalled := false
+ tuiCalled := false
+ runTraceFn = func(flags.Config) error {
+ traceCalled = true
+ return nil
+ }
+ runParquetFn = func(flags.Config) error {
+ parquetCalled = true
+ return nil
+ }
+ runTUIFn = func(flags.Config, tui.TraceStarter) error {
+ tuiCalled = true
+ return nil
+ }
+ runTUITestFlamesFn = func(flags.Config, tui.TraceStarter) error {
+ t.Fatalf("runTUITestFlamesFn should not be called in parquet mode")
+ return nil
+ }
+ runTUITestLiveFlamesFn = func(flags.Config, tui.TraceStarter) error {
+ t.Fatalf("runTUITestLiveFlamesFn should not be called in parquet mode")
+ return nil
+ }
+
+ cfg := flags.Config{ParquetPath: "trace.parquet"}
+ if err := dispatchRun(cfg); err != nil {
+ t.Fatalf("dispatchRun returned error: %v", err)
+ }
+ if !parquetCalled {
+ t.Fatalf("expected runParquetFn to be called")
+ }
+ if traceCalled {
+ t.Fatalf("did not expect runTraceFn to be called")
+ }
+ if tuiCalled {
+ t.Fatalf("did not expect runTUIFn to be called")
+ }
+}
+
func TestDispatchRunUsesTUIWhenOnlyPprofEnabled(t *testing.T) {
origRunTrace := runTraceFn
origRunTUI := runTUIFn
@@ -353,6 +424,31 @@ func TestValidateRunConfigRejectsBothTestModes(t *testing.T) {
}
}
+func TestValidateRunConfigRejectsParquetWithPlain(t *testing.T) {
+ cfg := flags.Config{ParquetPath: "trace.parquet", PlainMode: true}
+ err := validateRunConfig(cfg)
+ if err == nil {
+ t.Fatalf("expected error for -parquet with -plain")
+ }
+ if err.Error() != "-parquet and -plain are mutually exclusive" {
+ t.Fatalf("unexpected error: %v", err)
+ }
+}
+
+func TestValidateRunConfigRejectsParquetWithContentFilters(t *testing.T) {
+ cfg := flags.Config{
+ ParquetPath: "trace.parquet",
+ CommFilter: "nginx",
+ }
+ err := validateRunConfig(cfg)
+ if err == nil {
+ t.Fatalf("expected error for -parquet with content filters")
+ }
+ if err.Error() != "-parquet cannot be combined with content filters (-comm, -path, -pid, -tid)" {
+ t.Fatalf("unexpected error: %v", err)
+ }
+}
+
func TestBuildTestFlamesRuntimeSeedsLiveTrie(t *testing.T) {
cfg := flags.NewFlags()
_, streamBuf, liveTrie := buildTestFlamesRuntime(cfg)
@@ -561,6 +657,30 @@ func TestTuiTraceStarterFromRunTraceRespectsCancel(t *testing.T) {
}
}
+func TestHeadlessParquetTraceConfigClearsContentFilters(t *testing.T) {
+ cfg := flags.Config{
+ ParquetPath: "trace.parquet",
+ PidFilter: 1234,
+ TidFilter: 5678,
+ CommFilter: "nginx",
+ PathFilter: "/var/log",
+ GlobalFilter: globalfilter.Filter{
+ Syscall: &globalfilter.StringFilter{Pattern: "read"},
+ },
+ }
+
+ got := headlessParquetTraceConfig(cfg)
+ if got.PidFilter != -1 || got.TidFilter != -1 {
+ t.Fatalf("pid/tid filters = %d/%d, want -1/-1", got.PidFilter, got.TidFilter)
+ }
+ if got.CommFilter != "" || got.PathFilter != "" {
+ t.Fatalf("comm/path filters = %q/%q, want empty", got.CommFilter, got.PathFilter)
+ }
+ if got.GlobalFilter.IsActive() {
+ t.Fatalf("expected sanitized global filter to be empty, got %+v", got.GlobalFilter)
+ }
+}
+
func TestTuiTraceStarterFromRunTracePersistsRecorderAcrossRestarts(t *testing.T) {
recorder := parquet.NewRecorder(parquet.RecorderConfig{
BatchSize: 1,