diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-18 09:21:25 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-18 09:21:25 +0200 |
| commit | 630ea0ff27b8e9ff9287eaaf67660845406a19a6 (patch) | |
| tree | 84a6d4cec5780fd0f677d9b1f22238f775aa21f1 | |
| parent | 3f85aa438bffaad287a450898c44942634944c22 (diff) | |
refactor: split ior.go mega-file into focused files (task 427)
ior.go had 763 lines covering 9+ concerns. Follow the eventloop_*.go
pattern and extract into three focused files:
- ior_bpfsetup.go: libbpfTracepoint{Program,Module} adapter types,
setupBPFModule, setupBPFModuleError, setupEventChannel
- ior_profiling.go: profilingControl type, setupProfiling,
profilingFilesForMode, stop()
- ior_parquet_sink.go: headlessParquetSink type, runHeadlessParquet,
isHeadlessParquetMode, hasHeadlessParquetContentFilters,
headlessParquetTraceConfig; inline parquetMetadata one-liner
ior.go shrinks from 763 → 453 lines, retaining entry, dispatch,
TUI wiring, and core trace execution.
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
| -rw-r--r-- | internal/ior.go | 310 | ||||
| -rw-r--r-- | internal/ior_bpfsetup.go | 89 | ||||
| -rw-r--r-- | internal/ior_parquet_sink.go | 155 | ||||
| -rw-r--r-- | internal/ior_profiling.go | 118 |
4 files changed, 362 insertions, 310 deletions
diff --git a/internal/ior.go b/internal/ior.go index dba90c3..4bd5c8a 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -6,15 +6,10 @@ import ( "fmt" "os" "os/signal" - "runtime" - "runtime/pprof" - "runtime/trace" - "strings" "sync" "syscall" "time" - appconfig "ior/internal/config" "ior/internal/event" "ior/internal/flags" "ior/internal/flamegraph" @@ -22,8 +17,6 @@ import ( "ior/internal/parquet" "ior/internal/probemanager" "ior/internal/statsengine" - "ior/internal/streamrow" - "ior/internal/tracepoints" "ior/internal/tui" "ior/internal/tui/eventstream" @@ -42,31 +35,11 @@ var ( errRootPrivilegesRequired = errors.New("tracing requires root privileges (run with sudo)") ) -type libbpfTracepointProgram struct { - prog *bpf.BPFProg -} - -func (p libbpfTracepointProgram) AttachTracepoint(category, name string) (probemanager.Link, error) { - return p.prog.AttachTracepoint(category, name) -} - -type libbpfTracepointModule struct { - module *bpf.Module -} - type streamEventSink interface { eventstream.Source Push(eventstream.StreamEvent) } -func (m libbpfTracepointModule) GetProgram(progName string) (probemanager.Program, error) { - prog, err := m.module.GetProgram(progName) - if err != nil { - return nil, err - } - return libbpfTracepointProgram{prog: prog}, nil -} - // Run is the main entry point for the ior binary. // cfg must be provided by the caller; it should not be fetched from the global singleton here. func Run(cfg flags.Config) error { @@ -314,69 +287,6 @@ func runTrace(cfg flags.Config) error { return runTraceWithContext(context.Background(), cfg, nil, nil) } -func runHeadlessParquet(cfg flags.Config) error { - if getEUID() != 0 { - return errRootPrivilegesRequired - } - - cfg = headlessParquetTraceConfig(cfg) - logln := newLogger(true) - - bpfModule, mgr, releaseBindings, err := setupBPFModule(context.Background(), cfg) - if err != nil { - return err - } - defer bpfModule.Close() - defer mgr.Close() - defer releaseBindings() - - ch, err := setupEventChannel(bpfModule) - if err != nil { - return err - } - ctx, cancel, stopSignals := setupTraceContext(context.Background(), cfg, logln) - defer cancel() - defer stopSignals() - - profiling, err := setupProfiling(ctx, cfg, nil) - if err != nil { - return err - } - - el, err := newEventLoop(newEventLoopConfig(cfg)) - if err != nil { - return err - } - - recorder := parquet.NewRecorder(parquet.RecorderConfig{}) - if err := recorder.Start(cfg.ParquetPath, parquet.StartOptions{Metadata: parquetMetadata("headless")}); err != nil { - return err - } - - sink := newHeadlessParquetSink(recorder, cancel) - configureEventLoopOutput(el, mgr, sink.configure) - startTraceShutdownWatcher(ctx, true, el, profiling, logln) - - startTime := time.Now() - el.run(ctx, ch) - totalDuration := time.Since(startTime) - <-profiling.done - - stopErr := recorder.Stop() - if err := sink.err(); err != nil { - if stopErr != nil && !errors.Is(stopErr, err) { - return errors.Join(err, stopErr) - } - return err - } - if stopErr != nil { - return stopErr - } - - logln("Good bye... (unloading BPF tracepoints will take a few seconds...) after", totalDuration) - return nil -} - func newEventLoopConfig(cfg flags.Config) eventLoopConfig { fields := make([]string, len(cfg.CollapsedFields)) copy(fields, cfg.CollapsedFields) @@ -395,15 +305,6 @@ func traceFilterFromConfig(cfg flags.Config) globalfilter.Filter { return cfg.TraceFilter() } -type profilingControl struct { - done chan struct{} - enabled bool - cpuProfile *os.File - memProfile *os.File - stopExecTrace func() - stopOnce sync.Once -} - func newLogger(verbose bool) func(...any) { if !verbose { return func(...any) {} @@ -411,56 +312,6 @@ func newLogger(verbose bool) func(...any) { return func(args ...any) { _, _ = fmt.Println(args...) } } -func setupBPFModuleError(stage string, err error) error { - if err == nil { - return nil - } - return fmt.Errorf("setup BPF module: %s: %w", stage, err) -} - -func setupBPFModule(parentCtx context.Context, cfg flags.Config) (*bpf.Module, *probemanager.Manager, func(), error) { - releaseBindings := func() {} - - bpfModule, stage, err := loadBPFModule() - if err != nil { - return nil, nil, releaseBindings, setupBPFModuleError(stage, err) - } - if err := resizeBPFMaps(cfg, bpfModule); err != nil { - bpfModule.Close() - return nil, nil, releaseBindings, setupBPFModuleError("resize maps", err) - } - if err := setBPFGlobals(cfg, bpfModule); err != nil { - bpfModule.Close() - return nil, nil, releaseBindings, setupBPFModuleError("set globals", err) - } - if err := bpfModule.BPFLoadObject(); err != nil { - bpfModule.Close() - return nil, nil, releaseBindings, setupBPFModuleError("load object", err) - } - - mgr := probemanager.NewManager(libbpfTracepointModule{module: bpfModule}) - if err := mgr.AttachAll(cfg.ShouldIAttachTracepoint, tracepoints.List); err != nil { - mgr.Close() - bpfModule.Close() - return nil, nil, releaseBindings, setupBPFModuleError("attach probes", err) - } - if bindings, ok := tui.RuntimeBindingsFromContext(parentCtx); ok { - bindings.SetProbeManager(mgr) - releaseBindings = func() { bindings.SetProbeManager(nil) } - } - return bpfModule, mgr, releaseBindings, nil -} - -func setupEventChannel(bpfModule *bpf.Module) (chan []byte, error) { - ch := make(chan []byte, appconfig.DefaultChannelBufferSize) - rb, err := bpfModule.InitRingBuf("event_map", ch) - if err != nil { - return nil, err - } - rb.Poll(300) - return ch, nil -} - func setupTraceContext(parentCtx context.Context, cfg flags.Config, logln func(...any)) (context.Context, context.CancelFunc, func()) { ctx := parentCtx cancel := func() {} @@ -489,88 +340,6 @@ func setupTraceContext(parentCtx context.Context, cfg flags.Config, logln func(. return ctx, cancel, stopSignals } -func setupProfiling(ctx context.Context, cfg flags.Config, started chan<- struct{}) (*profilingControl, error) { - control := &profilingControl{ - done: make(chan struct{}), - stopExecTrace: func() {}, - } - if !cfg.PprofEnable { - close(control.done) - return control, nil - } - - control.enabled = true - isTUIMode := started != nil - cpuProfilePath, memProfilePath, execTracePath, execTraceDuration := profilingFilesForMode(isTUIMode) - - cpuProfile, err := os.Create(cpuProfilePath) - if err != nil { - return nil, err - } - memProfile, err := os.Create(memProfilePath) - if err != nil { - _ = cpuProfile.Close() - return nil, err - } - control.cpuProfile = cpuProfile - control.memProfile = memProfile - - if execTracePath != "" { - execTraceProfile, err := os.Create(execTracePath) - if err != nil { - _ = cpuProfile.Close() - _ = memProfile.Close() - return nil, err - } - if err := trace.Start(execTraceProfile); err != nil { - _ = cpuProfile.Close() - _ = memProfile.Close() - _ = execTraceProfile.Close() - return nil, err - } - var stopOnce sync.Once - control.stopExecTrace = func() { - stopOnce.Do(func() { - trace.Stop() - _ = execTraceProfile.Close() - }) - } - go func() { - timer := time.NewTimer(execTraceDuration) - defer timer.Stop() - select { - case <-ctx.Done(): - case <-timer.C: - } - control.stopExecTrace() - }() - } - - if err := pprof.StartCPUProfile(cpuProfile); err != nil { - control.stopExecTrace() - _ = cpuProfile.Close() - _ = memProfile.Close() - return nil, err - } - return control, nil -} - -func (p *profilingControl) stop(logln func(...any)) { - p.stopOnce.Do(func() { - if !p.enabled { - return - } - logln("Stopping profiling and writing profile files") - pprof.StopCPUProfile() - runtime.GC() - _ = pprof.WriteHeapProfile(p.memProfile) - p.stopExecTrace() - _ = p.cpuProfile.Close() - _ = p.memProfile.Close() - close(p.done) - }) -} - func configureEventLoopOutput(el *eventLoop, mgr *probemanager.Manager, configure func(*eventLoop)) { if configure != nil { configure(el) @@ -682,82 +451,3 @@ func signalTraceStarted(started chan<- struct{}) { func shouldAutoStopByDuration(cfg flags.Config) bool { return cfg.PlainMode || cfg.FlamegraphOutput || isHeadlessParquetMode(cfg) } - -func isHeadlessParquetMode(cfg flags.Config) bool { - return strings.TrimSpace(cfg.ParquetPath) != "" -} - -func hasHeadlessParquetContentFilters(cfg flags.Config) bool { - return cfg.CommFilter != "" || - cfg.PathFilter != "" || - cfg.PidFilter > 0 || - cfg.TidFilter > 0 || - cfg.GlobalFilter.IsActive() -} - -func headlessParquetTraceConfig(cfg flags.Config) flags.Config { - out := cfg - out.PlainMode = false - out.FlamegraphOutput = false - out.CommFilter = "" - out.PathFilter = "" - out.PidFilter = -1 - out.TidFilter = -1 - out.GlobalFilter = globalfilter.Filter{} - return out -} - -// parquetMetadata delegates to the canonical parquet.NewFileMetadata. -func parquetMetadata(mode string) parquet.FileMetadata { - return parquet.NewFileMetadata(mode) -} - -type headlessParquetSink struct { - recorder *parquet.Recorder - seq *streamrow.Sequencer - cancel context.CancelFunc - - mu sync.Mutex - recErr error -} - -func newHeadlessParquetSink(recorder *parquet.Recorder, cancel context.CancelFunc) *headlessParquetSink { - return &headlessParquetSink{ - recorder: recorder, - seq: streamrow.NewSequencer(0), - cancel: cancel, - } -} - -func (s *headlessParquetSink) configure(el *eventLoop) { - el.printCb = func(ep *event.Pair) { - row := streamrow.New(s.seq.Next(), ep) - if err := s.recorder.Record(row, 0); err != nil { - s.fail(err) - } - ep.Recycle() - } -} - -func (s *headlessParquetSink) fail(err error) { - s.mu.Lock() - defer s.mu.Unlock() - if s.recErr != nil { - return - } - s.recErr = err - s.cancel() -} - -func (s *headlessParquetSink) err() error { - s.mu.Lock() - defer s.mu.Unlock() - return s.recErr -} - -func profilingFilesForMode(tuiMode bool) (cpuProfilePath, memProfilePath, execTracePath string, execTraceDuration time.Duration) { - if tuiMode { - return "ior-tui-cpu.prof", "ior-tui-mem.prof", "ior-tui-trace.out", 10 * time.Second - } - return "ior.cpuprofile", "ior.memprofile", "", 0 -} diff --git a/internal/ior_bpfsetup.go b/internal/ior_bpfsetup.go new file mode 100644 index 0000000..cf0b112 --- /dev/null +++ b/internal/ior_bpfsetup.go @@ -0,0 +1,89 @@ +package internal + +import ( + "context" + "fmt" + + appconfig "ior/internal/config" + "ior/internal/flags" + "ior/internal/probemanager" + "ior/internal/tracepoints" + "ior/internal/tui" + + bpf "github.com/aquasecurity/libbpfgo" +) + +// libbpfTracepointProgram wraps a libbpf BPF program as a probemanager.Program. +type libbpfTracepointProgram struct { + prog *bpf.BPFProg +} + +func (p libbpfTracepointProgram) AttachTracepoint(category, name string) (probemanager.Link, error) { + return p.prog.AttachTracepoint(category, name) +} + +// libbpfTracepointModule wraps a libbpf BPF module as a probemanager.Module. +type libbpfTracepointModule struct { + module *bpf.Module +} + +func (m libbpfTracepointModule) GetProgram(progName string) (probemanager.Program, error) { + prog, err := m.module.GetProgram(progName) + if err != nil { + return nil, err + } + return libbpfTracepointProgram{prog: prog}, nil +} + +func setupBPFModuleError(stage string, err error) error { + if err == nil { + return nil + } + return fmt.Errorf("setup BPF module: %s: %w", stage, err) +} + +// setupBPFModule loads and attaches the BPF module, attaching tracepoints +// and registering the probe manager with any TUI runtime bindings. +func setupBPFModule(parentCtx context.Context, cfg flags.Config) (*bpf.Module, *probemanager.Manager, func(), error) { + releaseBindings := func() {} + + bpfModule, stage, err := loadBPFModule() + if err != nil { + return nil, nil, releaseBindings, setupBPFModuleError(stage, err) + } + if err := resizeBPFMaps(cfg, bpfModule); err != nil { + bpfModule.Close() + return nil, nil, releaseBindings, setupBPFModuleError("resize maps", err) + } + if err := setBPFGlobals(cfg, bpfModule); err != nil { + bpfModule.Close() + return nil, nil, releaseBindings, setupBPFModuleError("set globals", err) + } + if err := bpfModule.BPFLoadObject(); err != nil { + bpfModule.Close() + return nil, nil, releaseBindings, setupBPFModuleError("load object", err) + } + + mgr := probemanager.NewManager(libbpfTracepointModule{module: bpfModule}) + if err := mgr.AttachAll(cfg.ShouldIAttachTracepoint, tracepoints.List); err != nil { + mgr.Close() + bpfModule.Close() + return nil, nil, releaseBindings, setupBPFModuleError("attach probes", err) + } + if bindings, ok := tui.RuntimeBindingsFromContext(parentCtx); ok { + bindings.SetProbeManager(mgr) + releaseBindings = func() { bindings.SetProbeManager(nil) } + } + return bpfModule, mgr, releaseBindings, nil +} + +// setupEventChannel initialises the BPF ring-buffer and returns the event channel. +func setupEventChannel(bpfModule *bpf.Module) (chan []byte, error) { + ch := make(chan []byte, appconfig.DefaultChannelBufferSize) + rb, err := bpfModule.InitRingBuf("event_map", ch) + if err != nil { + return nil, err + } + rb.Poll(300) + return ch, nil +} diff --git a/internal/ior_parquet_sink.go b/internal/ior_parquet_sink.go new file mode 100644 index 0000000..eb187e8 --- /dev/null +++ b/internal/ior_parquet_sink.go @@ -0,0 +1,155 @@ +package internal + +import ( + "context" + "errors" + "strings" + "sync" + "time" + + "ior/internal/event" + "ior/internal/flags" + "ior/internal/globalfilter" + "ior/internal/parquet" + "ior/internal/streamrow" +) + +// headlessParquetSink streams traced events directly to a Parquet file, +// cancelling the trace context on any recorder error. +type headlessParquetSink struct { + recorder *parquet.Recorder + seq *streamrow.Sequencer + cancel context.CancelFunc + + mu sync.Mutex + recErr error +} + +func newHeadlessParquetSink(recorder *parquet.Recorder, cancel context.CancelFunc) *headlessParquetSink { + return &headlessParquetSink{ + recorder: recorder, + seq: streamrow.NewSequencer(0), + cancel: cancel, + } +} + +// configure wires the event loop's print callback to record each pair to Parquet. +func (s *headlessParquetSink) configure(el *eventLoop) { + el.printCb = func(ep *event.Pair) { + row := streamrow.New(s.seq.Next(), ep) + if err := s.recorder.Record(row, 0); err != nil { + s.fail(err) + } + ep.Recycle() + } +} + +func (s *headlessParquetSink) fail(err error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.recErr != nil { + return + } + s.recErr = err + s.cancel() +} + +func (s *headlessParquetSink) err() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.recErr +} + +// isHeadlessParquetMode reports whether cfg specifies a headless Parquet recording run. +func isHeadlessParquetMode(cfg flags.Config) bool { + return strings.TrimSpace(cfg.ParquetPath) != "" +} + +// hasHeadlessParquetContentFilters reports whether cfg carries filters that are +// incompatible with headless Parquet mode (all events must be recorded unfiltered). +func hasHeadlessParquetContentFilters(cfg flags.Config) bool { + return cfg.CommFilter != "" || + cfg.PathFilter != "" || + cfg.PidFilter > 0 || + cfg.TidFilter > 0 || + cfg.GlobalFilter.IsActive() +} + +// headlessParquetTraceConfig strips TUI-only flags from cfg so that the +// headless Parquet run records a clean, unfiltered event stream. +func headlessParquetTraceConfig(cfg flags.Config) flags.Config { + out := cfg + out.PlainMode = false + out.FlamegraphOutput = false + out.CommFilter = "" + out.PathFilter = "" + out.PidFilter = -1 + out.TidFilter = -1 + out.GlobalFilter = globalfilter.Filter{} + return out +} + +// runHeadlessParquet records all traced syscalls directly to a Parquet file +// without starting the TUI. +func runHeadlessParquet(cfg flags.Config) error { + if getEUID() != 0 { + return errRootPrivilegesRequired + } + + cfg = headlessParquetTraceConfig(cfg) + logln := newLogger(true) + + bpfModule, mgr, releaseBindings, err := setupBPFModule(context.Background(), cfg) + if err != nil { + return err + } + defer bpfModule.Close() + defer mgr.Close() + defer releaseBindings() + + ch, err := setupEventChannel(bpfModule) + if err != nil { + return err + } + ctx, cancel, stopSignals := setupTraceContext(context.Background(), cfg, logln) + defer cancel() + defer stopSignals() + + profiling, err := setupProfiling(ctx, cfg, nil) + if err != nil { + return err + } + + el, err := newEventLoop(newEventLoopConfig(cfg)) + if err != nil { + return err + } + + recorder := parquet.NewRecorder(parquet.RecorderConfig{}) + if err := recorder.Start(cfg.ParquetPath, parquet.StartOptions{Metadata: parquet.NewFileMetadata("headless")}); err != nil { + return err + } + + sink := newHeadlessParquetSink(recorder, cancel) + configureEventLoopOutput(el, mgr, sink.configure) + startTraceShutdownWatcher(ctx, true, el, profiling, logln) + + startTime := time.Now() + el.run(ctx, ch) + totalDuration := time.Since(startTime) + <-profiling.done + + stopErr := recorder.Stop() + if err := sink.err(); err != nil { + if stopErr != nil && !errors.Is(stopErr, err) { + return errors.Join(err, stopErr) + } + return err + } + if stopErr != nil { + return stopErr + } + + logln("Good bye... (unloading BPF tracepoints will take a few seconds...) after", totalDuration) + return nil +} diff --git a/internal/ior_profiling.go b/internal/ior_profiling.go new file mode 100644 index 0000000..8ca3922 --- /dev/null +++ b/internal/ior_profiling.go @@ -0,0 +1,118 @@ +package internal + +import ( + "context" + "os" + "runtime" + "runtime/pprof" + "runtime/trace" + "sync" + "time" + + "ior/internal/flags" +) + +// profilingControl manages optional CPU, memory, and execution-trace profiling +// for a single tracing run. +type profilingControl struct { + done chan struct{} + enabled bool + cpuProfile *os.File + memProfile *os.File + stopExecTrace func() + stopOnce sync.Once +} + +// setupProfiling starts profiling if cfg.PprofEnable is set and returns a +// control handle. The caller must wait on control.done after the trace ends. +// started is non-nil in TUI mode; nil in plain/flamegraph mode. +func setupProfiling(ctx context.Context, cfg flags.Config, started chan<- struct{}) (*profilingControl, error) { + control := &profilingControl{ + done: make(chan struct{}), + stopExecTrace: func() {}, + } + if !cfg.PprofEnable { + close(control.done) + return control, nil + } + + control.enabled = true + isTUIMode := started != nil + cpuProfilePath, memProfilePath, execTracePath, execTraceDuration := profilingFilesForMode(isTUIMode) + + cpuProfile, err := os.Create(cpuProfilePath) + if err != nil { + return nil, err + } + memProfile, err := os.Create(memProfilePath) + if err != nil { + _ = cpuProfile.Close() + return nil, err + } + control.cpuProfile = cpuProfile + control.memProfile = memProfile + + if execTracePath != "" { + execTraceProfile, err := os.Create(execTracePath) + if err != nil { + _ = cpuProfile.Close() + _ = memProfile.Close() + return nil, err + } + if err := trace.Start(execTraceProfile); err != nil { + _ = cpuProfile.Close() + _ = memProfile.Close() + _ = execTraceProfile.Close() + return nil, err + } + var stopOnce sync.Once + control.stopExecTrace = func() { + stopOnce.Do(func() { + trace.Stop() + _ = execTraceProfile.Close() + }) + } + go func() { + timer := time.NewTimer(execTraceDuration) + defer timer.Stop() + select { + case <-ctx.Done(): + case <-timer.C: + } + control.stopExecTrace() + }() + } + + if err := pprof.StartCPUProfile(cpuProfile); err != nil { + control.stopExecTrace() + _ = cpuProfile.Close() + _ = memProfile.Close() + return nil, err + } + return control, nil +} + +func (p *profilingControl) stop(logln func(...any)) { + p.stopOnce.Do(func() { + if !p.enabled { + return + } + logln("Stopping profiling and writing profile files") + pprof.StopCPUProfile() + runtime.GC() + _ = pprof.WriteHeapProfile(p.memProfile) + p.stopExecTrace() + _ = p.cpuProfile.Close() + _ = p.memProfile.Close() + close(p.done) + }) +} + +// profilingFilesForMode returns the file paths and exec-trace duration to use +// depending on whether the binary is running in TUI mode or plain/flamegraph mode. +func profilingFilesForMode(tuiMode bool) (cpuProfilePath, memProfilePath, execTracePath string, execTraceDuration time.Duration) { + if tuiMode { + return "ior-tui-cpu.prof", "ior-tui-mem.prof", "ior-tui-trace.out", 10 * time.Second + } + return "ior.cpuprofile", "ior.memprofile", "", 0 +} |
