diff options
Diffstat (limited to 'internal/ior.go')
| -rw-r--r-- | internal/ior.go | 310 |
1 files changed, 0 insertions, 310 deletions
diff --git a/internal/ior.go b/internal/ior.go index dba90c3..4bd5c8a 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -6,15 +6,10 @@ import ( "fmt" "os" "os/signal" - "runtime" - "runtime/pprof" - "runtime/trace" - "strings" "sync" "syscall" "time" - appconfig "ior/internal/config" "ior/internal/event" "ior/internal/flags" "ior/internal/flamegraph" @@ -22,8 +17,6 @@ import ( "ior/internal/parquet" "ior/internal/probemanager" "ior/internal/statsengine" - "ior/internal/streamrow" - "ior/internal/tracepoints" "ior/internal/tui" "ior/internal/tui/eventstream" @@ -42,31 +35,11 @@ var ( errRootPrivilegesRequired = errors.New("tracing requires root privileges (run with sudo)") ) -type libbpfTracepointProgram struct { - prog *bpf.BPFProg -} - -func (p libbpfTracepointProgram) AttachTracepoint(category, name string) (probemanager.Link, error) { - return p.prog.AttachTracepoint(category, name) -} - -type libbpfTracepointModule struct { - module *bpf.Module -} - type streamEventSink interface { eventstream.Source Push(eventstream.StreamEvent) } -func (m libbpfTracepointModule) GetProgram(progName string) (probemanager.Program, error) { - prog, err := m.module.GetProgram(progName) - if err != nil { - return nil, err - } - return libbpfTracepointProgram{prog: prog}, nil -} - // Run is the main entry point for the ior binary. // cfg must be provided by the caller; it should not be fetched from the global singleton here. func Run(cfg flags.Config) error { @@ -314,69 +287,6 @@ func runTrace(cfg flags.Config) error { return runTraceWithContext(context.Background(), cfg, nil, nil) } -func runHeadlessParquet(cfg flags.Config) error { - if getEUID() != 0 { - return errRootPrivilegesRequired - } - - cfg = headlessParquetTraceConfig(cfg) - logln := newLogger(true) - - bpfModule, mgr, releaseBindings, err := setupBPFModule(context.Background(), cfg) - if err != nil { - return err - } - defer bpfModule.Close() - defer mgr.Close() - defer releaseBindings() - - ch, err := setupEventChannel(bpfModule) - if err != nil { - return err - } - ctx, cancel, stopSignals := setupTraceContext(context.Background(), cfg, logln) - defer cancel() - defer stopSignals() - - profiling, err := setupProfiling(ctx, cfg, nil) - if err != nil { - return err - } - - el, err := newEventLoop(newEventLoopConfig(cfg)) - if err != nil { - return err - } - - recorder := parquet.NewRecorder(parquet.RecorderConfig{}) - if err := recorder.Start(cfg.ParquetPath, parquet.StartOptions{Metadata: parquetMetadata("headless")}); err != nil { - return err - } - - sink := newHeadlessParquetSink(recorder, cancel) - configureEventLoopOutput(el, mgr, sink.configure) - startTraceShutdownWatcher(ctx, true, el, profiling, logln) - - startTime := time.Now() - el.run(ctx, ch) - totalDuration := time.Since(startTime) - <-profiling.done - - stopErr := recorder.Stop() - if err := sink.err(); err != nil { - if stopErr != nil && !errors.Is(stopErr, err) { - return errors.Join(err, stopErr) - } - return err - } - if stopErr != nil { - return stopErr - } - - logln("Good bye... (unloading BPF tracepoints will take a few seconds...) after", totalDuration) - return nil -} - func newEventLoopConfig(cfg flags.Config) eventLoopConfig { fields := make([]string, len(cfg.CollapsedFields)) copy(fields, cfg.CollapsedFields) @@ -395,15 +305,6 @@ func traceFilterFromConfig(cfg flags.Config) globalfilter.Filter { return cfg.TraceFilter() } -type profilingControl struct { - done chan struct{} - enabled bool - cpuProfile *os.File - memProfile *os.File - stopExecTrace func() - stopOnce sync.Once -} - func newLogger(verbose bool) func(...any) { if !verbose { return func(...any) {} @@ -411,56 +312,6 @@ func newLogger(verbose bool) func(...any) { return func(args ...any) { _, _ = fmt.Println(args...) } } -func setupBPFModuleError(stage string, err error) error { - if err == nil { - return nil - } - return fmt.Errorf("setup BPF module: %s: %w", stage, err) -} - -func setupBPFModule(parentCtx context.Context, cfg flags.Config) (*bpf.Module, *probemanager.Manager, func(), error) { - releaseBindings := func() {} - - bpfModule, stage, err := loadBPFModule() - if err != nil { - return nil, nil, releaseBindings, setupBPFModuleError(stage, err) - } - if err := resizeBPFMaps(cfg, bpfModule); err != nil { - bpfModule.Close() - return nil, nil, releaseBindings, setupBPFModuleError("resize maps", err) - } - if err := setBPFGlobals(cfg, bpfModule); err != nil { - bpfModule.Close() - return nil, nil, releaseBindings, setupBPFModuleError("set globals", err) - } - if err := bpfModule.BPFLoadObject(); err != nil { - bpfModule.Close() - return nil, nil, releaseBindings, setupBPFModuleError("load object", err) - } - - mgr := probemanager.NewManager(libbpfTracepointModule{module: bpfModule}) - if err := mgr.AttachAll(cfg.ShouldIAttachTracepoint, tracepoints.List); err != nil { - mgr.Close() - bpfModule.Close() - return nil, nil, releaseBindings, setupBPFModuleError("attach probes", err) - } - if bindings, ok := tui.RuntimeBindingsFromContext(parentCtx); ok { - bindings.SetProbeManager(mgr) - releaseBindings = func() { bindings.SetProbeManager(nil) } - } - return bpfModule, mgr, releaseBindings, nil -} - -func setupEventChannel(bpfModule *bpf.Module) (chan []byte, error) { - ch := make(chan []byte, appconfig.DefaultChannelBufferSize) - rb, err := bpfModule.InitRingBuf("event_map", ch) - if err != nil { - return nil, err - } - rb.Poll(300) - return ch, nil -} - func setupTraceContext(parentCtx context.Context, cfg flags.Config, logln func(...any)) (context.Context, context.CancelFunc, func()) { ctx := parentCtx cancel := func() {} @@ -489,88 +340,6 @@ func setupTraceContext(parentCtx context.Context, cfg flags.Config, logln func(. return ctx, cancel, stopSignals } -func setupProfiling(ctx context.Context, cfg flags.Config, started chan<- struct{}) (*profilingControl, error) { - control := &profilingControl{ - done: make(chan struct{}), - stopExecTrace: func() {}, - } - if !cfg.PprofEnable { - close(control.done) - return control, nil - } - - control.enabled = true - isTUIMode := started != nil - cpuProfilePath, memProfilePath, execTracePath, execTraceDuration := profilingFilesForMode(isTUIMode) - - cpuProfile, err := os.Create(cpuProfilePath) - if err != nil { - return nil, err - } - memProfile, err := os.Create(memProfilePath) - if err != nil { - _ = cpuProfile.Close() - return nil, err - } - control.cpuProfile = cpuProfile - control.memProfile = memProfile - - if execTracePath != "" { - execTraceProfile, err := os.Create(execTracePath) - if err != nil { - _ = cpuProfile.Close() - _ = memProfile.Close() - return nil, err - } - if err := trace.Start(execTraceProfile); err != nil { - _ = cpuProfile.Close() - _ = memProfile.Close() - _ = execTraceProfile.Close() - return nil, err - } - var stopOnce sync.Once - control.stopExecTrace = func() { - stopOnce.Do(func() { - trace.Stop() - _ = execTraceProfile.Close() - }) - } - go func() { - timer := time.NewTimer(execTraceDuration) - defer timer.Stop() - select { - case <-ctx.Done(): - case <-timer.C: - } - control.stopExecTrace() - }() - } - - if err := pprof.StartCPUProfile(cpuProfile); err != nil { - control.stopExecTrace() - _ = cpuProfile.Close() - _ = memProfile.Close() - return nil, err - } - return control, nil -} - -func (p *profilingControl) stop(logln func(...any)) { - p.stopOnce.Do(func() { - if !p.enabled { - return - } - logln("Stopping profiling and writing profile files") - pprof.StopCPUProfile() - runtime.GC() - _ = pprof.WriteHeapProfile(p.memProfile) - p.stopExecTrace() - _ = p.cpuProfile.Close() - _ = p.memProfile.Close() - close(p.done) - }) -} - func configureEventLoopOutput(el *eventLoop, mgr *probemanager.Manager, configure func(*eventLoop)) { if configure != nil { configure(el) @@ -682,82 +451,3 @@ func signalTraceStarted(started chan<- struct{}) { func shouldAutoStopByDuration(cfg flags.Config) bool { return cfg.PlainMode || cfg.FlamegraphOutput || isHeadlessParquetMode(cfg) } - -func isHeadlessParquetMode(cfg flags.Config) bool { - return strings.TrimSpace(cfg.ParquetPath) != "" -} - -func hasHeadlessParquetContentFilters(cfg flags.Config) bool { - return cfg.CommFilter != "" || - cfg.PathFilter != "" || - cfg.PidFilter > 0 || - cfg.TidFilter > 0 || - cfg.GlobalFilter.IsActive() -} - -func headlessParquetTraceConfig(cfg flags.Config) flags.Config { - out := cfg - out.PlainMode = false - out.FlamegraphOutput = false - out.CommFilter = "" - out.PathFilter = "" - out.PidFilter = -1 - out.TidFilter = -1 - out.GlobalFilter = globalfilter.Filter{} - return out -} - -// parquetMetadata delegates to the canonical parquet.NewFileMetadata. -func parquetMetadata(mode string) parquet.FileMetadata { - return parquet.NewFileMetadata(mode) -} - -type headlessParquetSink struct { - recorder *parquet.Recorder - seq *streamrow.Sequencer - cancel context.CancelFunc - - mu sync.Mutex - recErr error -} - -func newHeadlessParquetSink(recorder *parquet.Recorder, cancel context.CancelFunc) *headlessParquetSink { - return &headlessParquetSink{ - recorder: recorder, - seq: streamrow.NewSequencer(0), - cancel: cancel, - } -} - -func (s *headlessParquetSink) configure(el *eventLoop) { - el.printCb = func(ep *event.Pair) { - row := streamrow.New(s.seq.Next(), ep) - if err := s.recorder.Record(row, 0); err != nil { - s.fail(err) - } - ep.Recycle() - } -} - -func (s *headlessParquetSink) fail(err error) { - s.mu.Lock() - defer s.mu.Unlock() - if s.recErr != nil { - return - } - s.recErr = err - s.cancel() -} - -func (s *headlessParquetSink) err() error { - s.mu.Lock() - defer s.mu.Unlock() - return s.recErr -} - -func profilingFilesForMode(tuiMode bool) (cpuProfilePath, memProfilePath, execTracePath string, execTraceDuration time.Duration) { - if tuiMode { - return "ior-tui-cpu.prof", "ior-tui-mem.prof", "ior-tui-trace.out", 10 * time.Second - } - return "ior.cpuprofile", "ior.memprofile", "", 0 -} |
