summaryrefslogtreecommitdiff
path: root/internal/ior.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/ior.go')
-rw-r--r--internal/ior.go310
1 files changed, 0 insertions, 310 deletions
diff --git a/internal/ior.go b/internal/ior.go
index dba90c3..4bd5c8a 100644
--- a/internal/ior.go
+++ b/internal/ior.go
@@ -6,15 +6,10 @@ import (
"fmt"
"os"
"os/signal"
- "runtime"
- "runtime/pprof"
- "runtime/trace"
- "strings"
"sync"
"syscall"
"time"
- appconfig "ior/internal/config"
"ior/internal/event"
"ior/internal/flags"
"ior/internal/flamegraph"
@@ -22,8 +17,6 @@ import (
"ior/internal/parquet"
"ior/internal/probemanager"
"ior/internal/statsengine"
- "ior/internal/streamrow"
- "ior/internal/tracepoints"
"ior/internal/tui"
"ior/internal/tui/eventstream"
@@ -42,31 +35,11 @@ var (
errRootPrivilegesRequired = errors.New("tracing requires root privileges (run with sudo)")
)
-type libbpfTracepointProgram struct {
- prog *bpf.BPFProg
-}
-
-func (p libbpfTracepointProgram) AttachTracepoint(category, name string) (probemanager.Link, error) {
- return p.prog.AttachTracepoint(category, name)
-}
-
-type libbpfTracepointModule struct {
- module *bpf.Module
-}
-
type streamEventSink interface {
eventstream.Source
Push(eventstream.StreamEvent)
}
-func (m libbpfTracepointModule) GetProgram(progName string) (probemanager.Program, error) {
- prog, err := m.module.GetProgram(progName)
- if err != nil {
- return nil, err
- }
- return libbpfTracepointProgram{prog: prog}, nil
-}
-
// Run is the main entry point for the ior binary.
// cfg must be provided by the caller; it should not be fetched from the global singleton here.
func Run(cfg flags.Config) error {
@@ -314,69 +287,6 @@ func runTrace(cfg flags.Config) error {
return runTraceWithContext(context.Background(), cfg, nil, nil)
}
-func runHeadlessParquet(cfg flags.Config) error {
- if getEUID() != 0 {
- return errRootPrivilegesRequired
- }
-
- cfg = headlessParquetTraceConfig(cfg)
- logln := newLogger(true)
-
- bpfModule, mgr, releaseBindings, err := setupBPFModule(context.Background(), cfg)
- if err != nil {
- return err
- }
- defer bpfModule.Close()
- defer mgr.Close()
- defer releaseBindings()
-
- ch, err := setupEventChannel(bpfModule)
- if err != nil {
- return err
- }
- ctx, cancel, stopSignals := setupTraceContext(context.Background(), cfg, logln)
- defer cancel()
- defer stopSignals()
-
- profiling, err := setupProfiling(ctx, cfg, nil)
- if err != nil {
- return err
- }
-
- el, err := newEventLoop(newEventLoopConfig(cfg))
- if err != nil {
- return err
- }
-
- recorder := parquet.NewRecorder(parquet.RecorderConfig{})
- if err := recorder.Start(cfg.ParquetPath, parquet.StartOptions{Metadata: parquetMetadata("headless")}); err != nil {
- return err
- }
-
- sink := newHeadlessParquetSink(recorder, cancel)
- configureEventLoopOutput(el, mgr, sink.configure)
- startTraceShutdownWatcher(ctx, true, el, profiling, logln)
-
- startTime := time.Now()
- el.run(ctx, ch)
- totalDuration := time.Since(startTime)
- <-profiling.done
-
- stopErr := recorder.Stop()
- if err := sink.err(); err != nil {
- if stopErr != nil && !errors.Is(stopErr, err) {
- return errors.Join(err, stopErr)
- }
- return err
- }
- if stopErr != nil {
- return stopErr
- }
-
- logln("Good bye... (unloading BPF tracepoints will take a few seconds...) after", totalDuration)
- return nil
-}
-
func newEventLoopConfig(cfg flags.Config) eventLoopConfig {
fields := make([]string, len(cfg.CollapsedFields))
copy(fields, cfg.CollapsedFields)
@@ -395,15 +305,6 @@ func traceFilterFromConfig(cfg flags.Config) globalfilter.Filter {
return cfg.TraceFilter()
}
-type profilingControl struct {
- done chan struct{}
- enabled bool
- cpuProfile *os.File
- memProfile *os.File
- stopExecTrace func()
- stopOnce sync.Once
-}
-
func newLogger(verbose bool) func(...any) {
if !verbose {
return func(...any) {}
@@ -411,56 +312,6 @@ func newLogger(verbose bool) func(...any) {
return func(args ...any) { _, _ = fmt.Println(args...) }
}
-func setupBPFModuleError(stage string, err error) error {
- if err == nil {
- return nil
- }
- return fmt.Errorf("setup BPF module: %s: %w", stage, err)
-}
-
-func setupBPFModule(parentCtx context.Context, cfg flags.Config) (*bpf.Module, *probemanager.Manager, func(), error) {
- releaseBindings := func() {}
-
- bpfModule, stage, err := loadBPFModule()
- if err != nil {
- return nil, nil, releaseBindings, setupBPFModuleError(stage, err)
- }
- if err := resizeBPFMaps(cfg, bpfModule); err != nil {
- bpfModule.Close()
- return nil, nil, releaseBindings, setupBPFModuleError("resize maps", err)
- }
- if err := setBPFGlobals(cfg, bpfModule); err != nil {
- bpfModule.Close()
- return nil, nil, releaseBindings, setupBPFModuleError("set globals", err)
- }
- if err := bpfModule.BPFLoadObject(); err != nil {
- bpfModule.Close()
- return nil, nil, releaseBindings, setupBPFModuleError("load object", err)
- }
-
- mgr := probemanager.NewManager(libbpfTracepointModule{module: bpfModule})
- if err := mgr.AttachAll(cfg.ShouldIAttachTracepoint, tracepoints.List); err != nil {
- mgr.Close()
- bpfModule.Close()
- return nil, nil, releaseBindings, setupBPFModuleError("attach probes", err)
- }
- if bindings, ok := tui.RuntimeBindingsFromContext(parentCtx); ok {
- bindings.SetProbeManager(mgr)
- releaseBindings = func() { bindings.SetProbeManager(nil) }
- }
- return bpfModule, mgr, releaseBindings, nil
-}
-
-func setupEventChannel(bpfModule *bpf.Module) (chan []byte, error) {
- ch := make(chan []byte, appconfig.DefaultChannelBufferSize)
- rb, err := bpfModule.InitRingBuf("event_map", ch)
- if err != nil {
- return nil, err
- }
- rb.Poll(300)
- return ch, nil
-}
-
func setupTraceContext(parentCtx context.Context, cfg flags.Config, logln func(...any)) (context.Context, context.CancelFunc, func()) {
ctx := parentCtx
cancel := func() {}
@@ -489,88 +340,6 @@ func setupTraceContext(parentCtx context.Context, cfg flags.Config, logln func(.
return ctx, cancel, stopSignals
}
-func setupProfiling(ctx context.Context, cfg flags.Config, started chan<- struct{}) (*profilingControl, error) {
- control := &profilingControl{
- done: make(chan struct{}),
- stopExecTrace: func() {},
- }
- if !cfg.PprofEnable {
- close(control.done)
- return control, nil
- }
-
- control.enabled = true
- isTUIMode := started != nil
- cpuProfilePath, memProfilePath, execTracePath, execTraceDuration := profilingFilesForMode(isTUIMode)
-
- cpuProfile, err := os.Create(cpuProfilePath)
- if err != nil {
- return nil, err
- }
- memProfile, err := os.Create(memProfilePath)
- if err != nil {
- _ = cpuProfile.Close()
- return nil, err
- }
- control.cpuProfile = cpuProfile
- control.memProfile = memProfile
-
- if execTracePath != "" {
- execTraceProfile, err := os.Create(execTracePath)
- if err != nil {
- _ = cpuProfile.Close()
- _ = memProfile.Close()
- return nil, err
- }
- if err := trace.Start(execTraceProfile); err != nil {
- _ = cpuProfile.Close()
- _ = memProfile.Close()
- _ = execTraceProfile.Close()
- return nil, err
- }
- var stopOnce sync.Once
- control.stopExecTrace = func() {
- stopOnce.Do(func() {
- trace.Stop()
- _ = execTraceProfile.Close()
- })
- }
- go func() {
- timer := time.NewTimer(execTraceDuration)
- defer timer.Stop()
- select {
- case <-ctx.Done():
- case <-timer.C:
- }
- control.stopExecTrace()
- }()
- }
-
- if err := pprof.StartCPUProfile(cpuProfile); err != nil {
- control.stopExecTrace()
- _ = cpuProfile.Close()
- _ = memProfile.Close()
- return nil, err
- }
- return control, nil
-}
-
-func (p *profilingControl) stop(logln func(...any)) {
- p.stopOnce.Do(func() {
- if !p.enabled {
- return
- }
- logln("Stopping profiling and writing profile files")
- pprof.StopCPUProfile()
- runtime.GC()
- _ = pprof.WriteHeapProfile(p.memProfile)
- p.stopExecTrace()
- _ = p.cpuProfile.Close()
- _ = p.memProfile.Close()
- close(p.done)
- })
-}
-
func configureEventLoopOutput(el *eventLoop, mgr *probemanager.Manager, configure func(*eventLoop)) {
if configure != nil {
configure(el)
@@ -682,82 +451,3 @@ func signalTraceStarted(started chan<- struct{}) {
func shouldAutoStopByDuration(cfg flags.Config) bool {
return cfg.PlainMode || cfg.FlamegraphOutput || isHeadlessParquetMode(cfg)
}
-
-func isHeadlessParquetMode(cfg flags.Config) bool {
- return strings.TrimSpace(cfg.ParquetPath) != ""
-}
-
-func hasHeadlessParquetContentFilters(cfg flags.Config) bool {
- return cfg.CommFilter != "" ||
- cfg.PathFilter != "" ||
- cfg.PidFilter > 0 ||
- cfg.TidFilter > 0 ||
- cfg.GlobalFilter.IsActive()
-}
-
-func headlessParquetTraceConfig(cfg flags.Config) flags.Config {
- out := cfg
- out.PlainMode = false
- out.FlamegraphOutput = false
- out.CommFilter = ""
- out.PathFilter = ""
- out.PidFilter = -1
- out.TidFilter = -1
- out.GlobalFilter = globalfilter.Filter{}
- return out
-}
-
-// parquetMetadata delegates to the canonical parquet.NewFileMetadata.
-func parquetMetadata(mode string) parquet.FileMetadata {
- return parquet.NewFileMetadata(mode)
-}
-
-type headlessParquetSink struct {
- recorder *parquet.Recorder
- seq *streamrow.Sequencer
- cancel context.CancelFunc
-
- mu sync.Mutex
- recErr error
-}
-
-func newHeadlessParquetSink(recorder *parquet.Recorder, cancel context.CancelFunc) *headlessParquetSink {
- return &headlessParquetSink{
- recorder: recorder,
- seq: streamrow.NewSequencer(0),
- cancel: cancel,
- }
-}
-
-func (s *headlessParquetSink) configure(el *eventLoop) {
- el.printCb = func(ep *event.Pair) {
- row := streamrow.New(s.seq.Next(), ep)
- if err := s.recorder.Record(row, 0); err != nil {
- s.fail(err)
- }
- ep.Recycle()
- }
-}
-
-func (s *headlessParquetSink) fail(err error) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.recErr != nil {
- return
- }
- s.recErr = err
- s.cancel()
-}
-
-func (s *headlessParquetSink) err() error {
- s.mu.Lock()
- defer s.mu.Unlock()
- return s.recErr
-}
-
-func profilingFilesForMode(tuiMode bool) (cpuProfilePath, memProfilePath, execTracePath string, execTraceDuration time.Duration) {
- if tuiMode {
- return "ior-tui-cpu.prof", "ior-tui-mem.prof", "ior-tui-trace.out", 10 * time.Second
- }
- return "ior.cpuprofile", "ior.memprofile", "", 0
-}