summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 22:05:25 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 22:05:25 +0200
commit81fdd28081922aaeb355a1f87cebaf85f93622c5 (patch)
treeefb747ac5995a4266f5e46cebdf3fef6c5ca8a26 /internal
parent0d4354bab36c95cd4e8125d2d7b5b66de4ae5d11 (diff)
Refactor event loop into focused units (task 389)
Diffstat (limited to 'internal')
-rw-r--r--internal/eventloop.go1034
-rw-r--r--internal/eventloop_comm.go213
-rw-r--r--internal/eventloop_exit.go346
-rw-r--r--internal/eventloop_runtime.go271
-rw-r--r--internal/eventloop_state.go237
5 files changed, 1067 insertions, 1034 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go
index d500350..7a7236c 100644
--- a/internal/eventloop.go
+++ b/internal/eventloop.go
@@ -3,15 +3,8 @@ package internal
import "C"
import (
- "context"
"fmt"
- "os"
- "path/filepath"
"reflect"
- "sort"
- "strconv"
- "sync"
- "syscall"
"time"
"ior/internal/event"
@@ -44,204 +37,6 @@ type eventLoopConfig struct {
commResolver *commResolver
}
-type fdTracker struct {
- files map[int32]file.File
-}
-
-func newFDTracker(files map[int32]file.File) *fdTracker {
- if files == nil {
- files = make(map[int32]file.File)
- }
- return &fdTracker{files: files}
-}
-
-func (t *fdTracker) get(fd int32) (file.File, bool) {
- f, ok := t.files[fd]
- return f, ok
-}
-
-func (t *fdTracker) set(fd int32, f file.File) {
- t.files[fd] = f
-}
-
-func (t *fdTracker) delete(fd int32) {
- delete(t.files, fd)
-}
-
-func (t *fdTracker) closeRangeFrom(first int32) {
- for fd := range t.files {
- if fd >= first {
- delete(t.files, fd)
- }
- }
-}
-
-type commResolver struct {
- comms map[uint32]string
-
- mu sync.RWMutex
- pending map[uint32]struct{}
- closed bool
-
- lookupQueue chan uint32
- lookupWorkers int
- resolveFn func(uint32) string
- startWorkersOnce sync.Once
- workersWG sync.WaitGroup
- shutdownOnce sync.Once
-}
-
-func newCommResolver(comms map[uint32]string) *commResolver {
- if comms == nil {
- comms = make(map[uint32]string)
- }
- r := &commResolver{
- comms: comms,
- pending: make(map[uint32]struct{}),
- }
- r.ensureLookupConfig()
- return r
-}
-
-func (r *commResolver) ensureLookupConfig() {
- if r.lookupWorkers <= 0 {
- r.lookupWorkers = defaultCommLookupWorkers
- }
- if r.lookupQueue == nil {
- r.lookupQueue = make(chan uint32, defaultCommLookupQueueSize)
- }
- if r.resolveFn == nil {
- r.resolveFn = resolveCommFromProc
- }
-}
-
-func (r *commResolver) startLookupWorkers() {
- r.ensureLookupConfig()
- r.mu.RLock()
- closed := r.closed
- r.mu.RUnlock()
- if closed {
- return
- }
- r.startWorkersOnce.Do(func() {
- for i := 0; i < r.lookupWorkers; i++ {
- r.workersWG.Add(1)
- go r.lookupWorker()
- }
- })
-}
-
-func (r *commResolver) lookupWorker() {
- defer r.workersWG.Done()
- for tid := range r.lookupQueue {
- comm := r.resolveFn(tid)
- r.mu.Lock()
- delete(r.pending, tid)
- if comm != "" {
- r.comms[tid] = comm
- }
- r.mu.Unlock()
- }
-}
-
-func (r *commResolver) seedTrackedPidComm(pidFilter int) {
- candidates := []uint32{uint32(os.Getpid()), uint32(os.Getppid())}
- if pidFilter > 0 {
- candidates = append(candidates, uint32(pidFilter))
- }
-
- seen := make(map[uint32]struct{}, len(candidates))
- for _, tid := range candidates {
- if tid == 0 {
- continue
- }
- if _, ok := seen[tid]; ok {
- continue
- }
- seen[tid] = struct{}{}
- if comm := resolveCommFromProc(tid); comm != "" {
- r.setCached(tid, comm)
- continue
- }
- r.queueLookup(tid)
- }
-}
-
-func (r *commResolver) comm(tid uint32) string {
- if comm, ok := r.cached(tid); ok {
- return comm
- }
- r.queueLookup(tid)
- return ""
-}
-
-func (r *commResolver) cached(tid uint32) (string, bool) {
- r.mu.RLock()
- defer r.mu.RUnlock()
- comm, ok := r.comms[tid]
- return comm, ok
-}
-
-func (r *commResolver) setCached(tid uint32, comm string) {
- if comm == "" {
- return
- }
- r.mu.Lock()
- r.comms[tid] = comm
- r.mu.Unlock()
-}
-
-func (r *commResolver) queueLookup(tid uint32) {
- if tid == 0 {
- return
- }
- r.startLookupWorkers()
-
- r.mu.Lock()
- defer r.mu.Unlock()
- if r.closed {
- return
- }
- if _, ok := r.comms[tid]; ok {
- return
- }
- if r.pending == nil {
- r.pending = make(map[uint32]struct{})
- }
- if _, ok := r.pending[tid]; ok {
- return
- }
- r.pending[tid] = struct{}{}
-
- // Keep event processing non-blocking if resolver workers are saturated.
- select {
- case r.lookupQueue <- tid:
- default:
- delete(r.pending, tid)
- }
-}
-
-func (r *commResolver) shutdown() {
- r.shutdownOnce.Do(func() {
- r.ensureLookupConfig()
- r.mu.Lock()
- r.closed = true
- for tid := range r.pending {
- delete(r.pending, tid)
- }
- queue := r.lookupQueue
- r.mu.Unlock()
- close(queue)
- r.workersWG.Wait()
- })
-}
-func (e *eventLoop) shutdownCommResolver() {
- if e.commResolver == nil {
- return
- }
- e.commResolver.shutdown()
-}
-
type rawEventHandler func(raw []byte, ch chan<- *event.Pair)
type tracepointExitHandler func(ep *event.Pair) bool
@@ -383,832 +178,3 @@ func (e *eventLoop) stats() string {
return stats
}
-
-func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) {
- defer close(e.done)
- defer e.shutdownCommResolver()
-
- if e.cfg.pprofEnable {
- fmt.Println("Profiling, press Ctrl+C to stop")
- }
- if e.cfg.plainMode && !e.cfg.pprofEnable {
- fmt.Println(event.EventStreamHeader)
- }
-
- e.startTime = time.Now()
- if e.printCb == nil {
- e.printCb = func(ep *event.Pair) { ep.Recycle() }
- }
- if e.cfg.synchronousRawProcessing {
- e.runSynchronously(ctx, rawCh)
- return
- }
- for ep := range e.events(ctx, rawCh) {
- e.printCb(ep)
- e.numSyscallsAfterFilter++
- }
-}
-
-func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) {
- pairs := make(chan *event.Pair, 1)
-
- for {
- select {
- case raw, ok := <-rawCh:
- if !ok {
- return
- }
- if len(raw) == 0 {
- continue
- }
- e.processRawEvent(raw, pairs)
- for {
- select {
- case ep := <-pairs:
- e.printCb(ep)
- e.numSyscallsAfterFilter++
- default:
- goto nextRaw
- }
- }
- case <-ctx.Done():
- fmt.Println("Stopping event loop")
- return
- }
- nextRaw:
- }
-}
-
-func (e *eventLoop) events(ctx context.Context, rawCh <-chan []byte) <-chan *event.Pair {
- ch := make(chan *event.Pair)
-
- go func() {
- defer close(ch)
-
- for {
- select {
- case raw, ok := <-rawCh:
- if !ok {
- return
- }
- if len(raw) == 0 {
- continue
- }
- e.processRawEvent(raw, ch)
- case <-ctx.Done():
- fmt.Println("Stopping event loop")
- return
- }
- }
- }()
-
- return ch
-}
-
-func (e *eventLoop) processRawEvent(raw []byte, ch chan<- *event.Pair) {
- e.numTracepoints++
- e.initRawHandlers()
- evType := types.EventType(raw[0])
- handler, ok := e.rawHandlers[evType]
- if !ok {
- e.notifyWarning(fmt.Sprintf("Dropped unhandled raw event type %d", evType))
- return
- }
- handler(raw, ch)
-}
-
-func (e *eventLoop) initRawHandlers() {
- if e.rawHandlers == nil {
- e.rawHandlers = make(map[types.EventType]rawEventHandler)
- }
- if len(e.rawHandlers) != 0 {
- return
- }
-
- e.rawHandlers[types.ENTER_OPEN_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
- openEv, ok := decodeRawEvent(e, types.ENTER_OPEN_EVENT, raw, types.NewOpenEventFast)
- if !ok {
- return
- }
- if e.filter.MatchOpenEvent(openEv) {
- e.tracepointEntered(openEv)
- }
- }
- e.rawHandlers[types.EXIT_OPEN_EVENT] = func(raw []byte, ch chan<- *event.Pair) {
- retEv, ok := decodeRawEvent(e, types.EXIT_OPEN_EVENT, raw, types.NewRetEventFast)
- if !ok {
- return
- }
- e.tracepointExited(retEv, ch)
- }
- e.rawHandlers[types.ENTER_FD_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
- fdEv, ok := decodeRawEvent(e, types.ENTER_FD_EVENT, raw, types.NewFdEventFast)
- if !ok {
- return
- }
- e.tracepointEntered(fdEv)
- }
- e.rawHandlers[types.EXIT_FD_EVENT] = func(raw []byte, ch chan<- *event.Pair) {
- fdEv, ok := decodeRawEvent(e, types.EXIT_FD_EVENT, raw, types.NewFdEventFast)
- if !ok {
- return
- }
- e.tracepointExited(fdEv, ch)
- }
- e.rawHandlers[types.ENTER_NULL_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
- nullEv, ok := decodeRawEvent(e, types.ENTER_NULL_EVENT, raw, types.NewNullEventFast)
- if !ok {
- return
- }
- e.tracepointEntered(nullEv)
- }
- e.rawHandlers[types.EXIT_NULL_EVENT] = func(raw []byte, ch chan<- *event.Pair) {
- nullEv, ok := decodeRawEvent(e, types.EXIT_NULL_EVENT, raw, types.NewNullEventFast)
- if !ok {
- return
- }
- e.tracepointExited(nullEv, ch)
- }
- e.rawHandlers[types.EXIT_RET_EVENT] = func(raw []byte, ch chan<- *event.Pair) {
- retEv, ok := decodeRawEvent(e, types.EXIT_RET_EVENT, raw, types.NewRetEventFast)
- if !ok {
- return
- }
- e.tracepointExited(retEv, ch)
- }
- e.rawHandlers[types.ENTER_NAME_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
- nameEv, ok := decodeRawEvent(e, types.ENTER_NAME_EVENT, raw, types.NewNameEventFast)
- if !ok {
- return
- }
- if e.filter.MatchNameEvent(nameEv) {
- e.tracepointEntered(nameEv)
- }
- }
- e.rawHandlers[types.ENTER_PATH_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
- pathEv, ok := decodeRawEvent(e, types.ENTER_PATH_EVENT, raw, types.NewPathEventFast)
- if !ok {
- return
- }
- if e.filter.MatchPathEvent(pathEv) {
- e.tracepointEntered(pathEv)
- }
- }
- e.rawHandlers[types.ENTER_FCNTL_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
- fcntlEv, ok := decodeRawEvent(e, types.ENTER_FCNTL_EVENT, raw, types.NewFcntlEventFast)
- if !ok {
- return
- }
- e.tracepointEntered(fcntlEv)
- }
- e.rawHandlers[types.ENTER_OPEN_BY_HANDLE_AT_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
- openByHandleEv, ok := decodeRawEvent(e, types.ENTER_OPEN_BY_HANDLE_AT_EVENT, raw, types.NewOpenByHandleAtEventFast)
- if !ok {
- return
- }
- e.tracepointEntered(openByHandleEv)
- }
- e.rawHandlers[types.ENTER_DUP3_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
- dup3Ev, ok := decodeRawEvent(e, types.ENTER_DUP3_EVENT, raw, types.NewDup3EventFast)
- if !ok {
- return
- }
- e.tracepointEntered(dup3Ev)
- }
-}
-
-func decodeRawEvent[T any](e *eventLoop, eventType types.EventType, raw []byte, decode func([]byte) *T) (*T, bool) {
- decoded := decode(raw)
- if decoded == nil {
- e.dropMalformedRawEvent(eventType, raw)
- return nil, false
- }
- return decoded, true
-}
-
-func (e *eventLoop) tracepointEntered(enterEv event.Event) {
- tid := enterEv.GetTid()
- // Schedule comm lookup as early as possible to reduce races for short-lived processes.
- e.queueCommLookup(tid)
- if !e.filter.UsesCommFilter() {
- e.setEnterEvent(enterEv)
- return
- }
-
- switch enterEv.(type) {
- case *types.OpenEvent:
- e.setEnterEvent(enterEv)
- default:
- // Only, when we have a comm name
- if _, ok := e.cachedComm(tid); ok {
- e.setEnterEvent(enterEv)
- } else {
- e.notifyWarning(fmt.Sprintf("No comm name for %v process probably already vanished?", enterEv))
- }
- }
-}
-
-func (e *eventLoop) tracepointExited(exitEv event.Event, ch chan<- *event.Pair) {
- ep, ok := e.consumeEnterEvent(exitEv.GetTid())
- if !ok {
- exitEv.Recycle()
- return
- }
- ep.ExitEv = exitEv
- e.numSyscalls++
-
- // Expect ID one lower, otherwise, enter and exit tracepoints
- // don't match up. E.g.:
- // enterEv:SYS_ENTER_OPEN => exitEv:SYS_EXIT_OPEN
- if ep.EnterEv.GetTraceId()-1 != ep.ExitEv.GetTraceId() {
- e.numTracepointMismatches++
- e.notifyWarning("Dropped tracepoint pair with mismatched enter/exit IDs")
- ep.Recycle()
- return
- }
- if !e.handleTracepointExit(ep) {
- return
- }
- prevPairTime, _ := e.prevPairTimes[ep.EnterEv.GetTid()]
- ep.CalculateDurations(prevPairTime)
- e.prevPairTimes[ep.EnterEv.GetTid()] = ep.ExitEv.GetTime()
- e.freezePairForEmission(ep)
- ch <- ep
-}
-
-func (e *eventLoop) freezePairForEmission(ep *event.Pair) {
- fdFile, ok := ep.File.(*file.FdFile)
- if !ok {
- return
- }
- ep.File = fdFile.Dup(fdFile.FD())
-}
-
-func (e *eventLoop) initExitHandlers() {
- e.exitHandlers = map[reflect.Type]tracepointExitHandler{
- typeKey[*types.OpenEvent](): newTypedExitHandler(e, "Dropped malformed open enter event", e.handleOpenExit),
- typeKey[*types.NameEvent](): newTypedExitHandler(e, "Dropped malformed name enter event", e.handleNameExit),
- typeKey[*types.PathEvent](): newTypedExitHandler(e, "Dropped malformed path enter event", e.handlePathExit),
- typeKey[*types.FdEvent](): newTypedExitHandler(e, "Dropped malformed fd enter event", e.handleFdExit),
- typeKey[*types.Dup3Event](): newTypedExitHandler(e, "Dropped malformed dup3 enter event", e.handleDup3Exit),
- typeKey[*types.OpenByHandleAtEvent](): newTypedExitHandler(e, "Dropped malformed open_by_handle_at enter event", e.handleOpenByHandleAtExit),
- typeKey[*types.NullEvent](): newTypedExitHandler(e, "Dropped malformed null enter event", e.handleNullExit),
- typeKey[*types.FcntlEvent](): newTypedExitHandler(e, "Dropped malformed fcntl enter event", e.handleFcntlExit),
- }
-}
-
-func mustBeType[T event.Event](e *eventLoop, ep *event.Pair, message string) (T, bool) {
- enterEv, ok := ep.EnterEv.(T)
- if !ok {
- e.recyclePair(ep, message)
- var zero T
- return zero, false
- }
- return enterEv, true
-}
-
-func newTypedExitHandler[T event.Event](e *eventLoop, message string, handle func(*event.Pair, T) bool) tracepointExitHandler {
- return func(ep *event.Pair) bool {
- enterEv, ok := mustBeType[T](e, ep, message)
- if !ok {
- return false
- }
- return handle(ep, enterEv)
- }
-}
-
-func typeKey[T any]() reflect.Type {
- var zero T
- return reflect.TypeOf(zero)
-}
-
-func (e *eventLoop) exitHandlerRegistry() map[reflect.Type]tracepointExitHandler {
- if e.exitHandlers == nil {
- e.initExitHandlers()
- }
- return e.exitHandlers
-}
-
-func (e *eventLoop) handleTracepointExit(ep *event.Pair) bool {
- handler, ok := e.exitHandlerRegistry()[reflect.TypeOf(ep.EnterEv)]
- if !ok {
- e.recyclePair(ep, "Dropped malformed enter event")
- return false
- }
- return handler(ep)
-}
-
-func (e *eventLoop) handleOpenExit(ep *event.Pair, openEv *types.OpenEvent) bool {
- retEvent, ok := ep.ExitEv.(*types.RetEvent)
- if !ok {
- e.recyclePair(ep, "Dropped malformed open exit event")
- return false
- }
-
- comm := types.StringValue(openEv.Comm[:])
- ep.Comm = comm
- if fd := int32(retEvent.Ret); fd >= 0 {
- fdFile := file.NewFd(fd, types.StringValue(openEv.Filename[:]), openEv.Flags)
- e.fdState().set(fd, fdFile)
- ep.File = fdFile
- } else {
- // Keep path information for failed opens so error scenarios remain observable.
- ep.File = file.NewPathname(openEv.Filename[:])
- }
- e.setCachedComm(openEv.Tid, comm)
- return true
-}
-
-func (e *eventLoop) handleNameExit(ep *event.Pair, nameEv *types.NameEvent) bool {
- ep.File = file.NewOldnameNewname(nameEv.Oldname[:], nameEv.Newname[:])
- ep.Comm = e.comm(nameEv.GetTid())
- return true
-}
-
-func (e *eventLoop) handlePathExit(ep *event.Pair, pathEv *types.PathEvent) bool {
- if pathEv.GetTraceId().Name() == sysEnterNameToHandleAtName {
- retEv, ok := ep.ExitEv.(*types.RetEvent)
- if !ok || retEv.Ret < 0 {
- ep.Recycle()
- return false
- }
- e.pendingHandles[pathEv.GetTid()] = types.StringValue(pathEv.Pathname[:])
- ep.Recycle()
- return false
- }
-
- if ep.Is(types.SYS_ENTER_CREAT) {
- retEvent, ok := ep.ExitEv.(*types.RetEvent)
- if !ok {
- e.recyclePair(ep, "Dropped malformed creat exit event")
- return false
- }
- if fd := int32(retEvent.Ret); fd >= 0 {
- fdFile := file.NewFd(fd, types.StringValue(pathEv.Pathname[:]),
- syscall.O_CREAT|syscall.O_WRONLY|syscall.O_TRUNC)
- e.fdState().set(fd, fdFile)
- ep.File = fdFile
- }
- } else {
- ep.File = file.NewPathname(pathEv.Pathname[:])
- }
- ep.Comm = e.comm(pathEv.GetTid())
- return true
-}
-
-func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *types.FdEvent) bool {
- fd := fdEv.Fd
- ep.File = e.resolveFdFile(fd, fdEv.Pid)
- if ep.Is(types.SYS_ENTER_CLOSE) {
- e.fdState().delete(fd)
- e.deleteProcFdCache(fd, fdEv.Pid)
- }
- if ep.Is(types.SYS_ENTER_CLOSE_RANGE) {
- // close_range provides (first, last), but fd_event only carries the first
- // argument, so we approximate by closing all tracked fds >= first.
- retEv, ok := ep.ExitEv.(*types.RetEvent)
- if ok && retEv.Ret == 0 {
- e.fdState().closeRangeFrom(fd)
- e.deleteProcFdCacheFrom(fd, fdEv.Pid)
- }
- }
- ep.Comm = e.comm(fdEv.GetTid())
- if !e.filter.MatchPair(ep) {
- ep.Recycle()
- return false
- }
-
- if ep.Is(types.SYS_ENTER_DUP) || ep.Is(types.SYS_ENTER_DUP2) {
- fdFile, ok := ep.File.(*file.FdFile)
- if !ok {
- e.recyclePair(ep, "Dropped malformed dup source event")
- return false
- }
- retEvent, ok := ep.ExitEv.(*types.RetEvent)
- if !ok {
- e.recyclePair(ep, "Dropped malformed dup exit event")
- return false
- }
- // Duplicating fd
- e.registerDup(fdFile, int32(retEvent.Ret), 0)
- }
- if ep.Is(types.SYS_ENTER_PIDFD_GETFD) {
- retEv, ok := ep.ExitEv.(*types.RetEvent)
- if !ok {
- e.recyclePair(ep, "Dropped malformed pidfd_getfd exit event")
- return false
- }
- if newFd := int32(retEv.Ret); newFd >= 0 {
- transferredFile := file.NewFdWithPid(newFd, fdEv.Pid)
- e.fdState().set(newFd, transferredFile)
- ep.File = transferredFile
- }
- }
- if retEv, ok := ep.ExitEv.(*types.RetEvent); ok {
- ep.Bytes = bytesFromRet(retEv)
- }
- return true
-}
-
-func (e *eventLoop) handleDup3Exit(ep *event.Pair, dup3Ev *types.Dup3Event) bool {
- fd := int32(dup3Ev.Fd)
- ep.File = e.resolveFdFile(fd, dup3Ev.Pid)
- ep.Comm = e.comm(dup3Ev.GetTid())
- if !e.filter.MatchPair(ep) {
- ep.Recycle()
- return false
- }
-
- fdFile, ok := ep.File.(*file.FdFile)
- if !ok {
- e.recyclePair(ep, "Dropped malformed dup3 source event")
- return false
- }
- retEvent, ok := ep.ExitEv.(*types.RetEvent)
- if !ok {
- e.recyclePair(ep, "Dropped malformed dup3 exit event")
- return false
- }
- e.registerDup(fdFile, int32(retEvent.Ret), dup3Ev.Flags&syscall.O_CLOEXEC)
- return true
-}
-
-func (e *eventLoop) handleOpenByHandleAtExit(ep *event.Pair, openByHandleEv *types.OpenByHandleAtEvent) bool {
- tid := openByHandleEv.GetTid()
- retEvent, ok := ep.ExitEv.(*types.RetEvent)
- if !ok {
- e.recyclePair(ep, "Dropped malformed open_by_handle_at exit event")
- return false
- }
-
- fd := int32(retEvent.Ret)
- if fd < 0 {
- ep.Recycle()
- return false
- }
-
- if pathname, ok := e.pendingHandles[tid]; ok {
- delete(e.pendingHandles, tid)
- fdFile := file.NewFd(fd, pathname, openByHandleEv.Flags)
- e.fdState().set(fd, fdFile)
- ep.File = fdFile
- } else {
- fdFile := file.NewFdWithPid(fd, openByHandleEv.Pid)
- if fdFile.Flags() == file.Flags(-1) {
- fdFile.SetFlags(openByHandleEv.Flags)
- }
- e.fdState().set(fd, fdFile)
- ep.File = fdFile
- }
- ep.Comm = e.comm(tid)
- return true
-}
-
-func (e *eventLoop) handleNullExit(ep *event.Pair, nullEv *types.NullEvent) bool {
- if ep.Is(types.SYS_ENTER_IO_URING_SETUP) {
- retEvent, ok := ep.ExitEv.(*types.RetEvent)
- if !ok {
- e.recyclePair(ep, "Dropped malformed io_uring_setup exit event")
- return false
- }
- if fd := int32(retEvent.Ret); fd >= 0 {
- fdFile := file.NewFdWithPid(fd, nullEv.Pid)
- e.fdState().set(fd, fdFile)
- ep.File = fdFile
- }
- }
- if ep.Is(types.SYS_ENTER_GETCWD) {
- retEvent, ok := ep.ExitEv.(*types.RetEvent)
- if !ok {
- e.recyclePair(ep, "Dropped malformed getcwd exit event")
- return false
- }
- if retEvent.Ret > 0 {
- if cwd, err := os.Readlink(fmt.Sprintf("/proc/%d/cwd", nullEv.GetTid())); err == nil {
- ep.File = file.NewPathname([]byte(cwd))
- }
- }
- }
- ep.Comm = e.comm(nullEv.GetTid())
- if !e.filter.MatchPair(ep) {
- ep.Recycle()
- return false
- }
- return true
-}
-
-func (e *eventLoop) handleFcntlExit(ep *event.Pair, fcntlEv *types.FcntlEvent) bool {
- ep.Comm = e.comm(fcntlEv.GetTid())
- fd := int32(fcntlEv.Fd)
- ep.File = e.resolveFdFile(fd, fcntlEv.Pid)
- if !e.filter.MatchPair(ep) {
- ep.Recycle()
- return false
- }
-
- retEvent, ok := ep.ExitEv.(*types.RetEvent)
- if !ok {
- e.recyclePair(ep, "Dropped malformed fcntl exit event")
- return false
- }
- // Syscall returned -1, nothing was changed with the fd
- if retEvent.Ret == -1 {
- return true
- }
-
- fdFile, ok := ep.File.(*file.FdFile)
- if !ok {
- e.recyclePair(ep, "Dropped malformed fcntl file event")
- return false
- }
-
- // See fcntl(2) for implementation details
- switch fcntlEv.Cmd {
- case syscall.F_SETFL:
- const canChange = syscall.O_APPEND | syscall.O_ASYNC | syscall.O_DIRECT | syscall.O_NOATIME | syscall.O_NONBLOCK
- fdFile.SetFlags(int32(fcntlEv.Arg) & int32(canChange))
- ep.File = fdFile
- e.fdState().set(fd, fdFile)
- case syscall.F_DUPFD:
- e.registerDup(fdFile, int32(retEvent.Ret), 0)
- case syscall.F_DUPFD_CLOEXEC:
- e.registerDup(fdFile, int32(retEvent.Ret), syscall.O_CLOEXEC)
- }
- return true
-}
-
-func (e *eventLoop) registerDup(fdFile *file.FdFile, newFd int32, extraFlags int32) {
- if newFd < 0 {
- return
- }
- duppedFdFile := fdFile.Dup(newFd)
- if extraFlags != 0 {
- duppedFdFile.AddFlags(extraFlags)
- }
- e.fdState().set(newFd, duppedFdFile)
-}
-
-func (e *eventLoop) recyclePair(ep *event.Pair, warning string) {
- e.notifyWarning(warning)
- ep.Recycle()
-}
-
-func (e *eventLoop) notifyWarning(message string) {
- if e.warningCb == nil || message == "" {
- return
- }
- e.warningCb(message)
-}
-
-func (e *eventLoop) dropMalformedRawEvent(evType types.EventType, raw []byte) {
- e.notifyWarning(fmt.Sprintf("Dropped malformed raw event type %d (len=%d)", evType, len(raw)))
-}
-
-func (e *eventLoop) resolveFdFile(fd int32, pid uint32) file.File {
- if fdFile, ok := e.fdState().get(fd); ok {
- return fdFile
- }
- if fd < 0 {
- return file.NewFd(fd, "", -1)
- }
-
- if cached, ok := e.cachedProcFdFile(fd, pid); ok {
- return cached
- }
-
- // Cache first procfs resolution to avoid repeated /proc lookups for hot unknown FDs.
- discovered := file.NewFdWithPid(fd, pid)
- e.setProcFdCache(fd, pid, discovered)
- return discovered
-}
-
-func (e *eventLoop) cachedProcFdFile(fd int32, pid uint32) (*file.FdFile, bool) {
- key := procFdCacheKey(pid, fd)
- cache, ok := e.procFdCacheState()[key]
- if ok {
- e.procFdCacheAgeState()[key] = e.nextCacheAge()
- }
- return cache, ok
-}
-
-func (e *eventLoop) setProcFdCache(fd int32, pid uint32, resolved *file.FdFile) {
- key := procFdCacheKey(pid, fd)
- e.procFdCacheState()[key] = resolved
- e.procFdCacheAgeState()[key] = e.nextCacheAge()
- e.pruneProcFdCache()
-}
-
-func (e *eventLoop) deleteProcFdCache(fd int32, pid uint32) {
- e.deleteProcFdCacheKey(procFdCacheKey(pid, fd))
-}
-
-func (e *eventLoop) deleteProcFdCacheFrom(first int32, pid uint32) {
- cache := e.procFdCacheState()
- for key := range cache {
- cachePid := uint32(key >> 32)
- cacheFd := int32(uint32(key))
- if cachePid == pid && cacheFd >= first {
- e.deleteProcFdCacheKey(key)
- }
- }
-}
-
-func (e *eventLoop) procFdCacheState() map[uint64]*file.FdFile {
- if e.procFdCache == nil {
- e.procFdCache = make(map[uint64]*file.FdFile)
- }
- return e.procFdCache
-}
-
-func (e *eventLoop) procFdCacheAgeState() map[uint64]uint64 {
- if e.procFdCacheAges == nil {
- e.procFdCacheAges = make(map[uint64]uint64)
- }
- return e.procFdCacheAges
-}
-
-func (e *eventLoop) enterEventAgeState() map[uint32]uint64 {
- if e.enterEvAges == nil {
- e.enterEvAges = make(map[uint32]uint64)
- }
- return e.enterEvAges
-}
-
-func (e *eventLoop) enterEventState() map[uint32]*event.Pair {
- if e.enterEvs == nil {
- e.enterEvs = make(map[uint32]*event.Pair)
- }
- return e.enterEvs
-}
-
-func (e *eventLoop) setEnterEvent(enterEv event.Event) {
- tid := enterEv.GetTid()
- pair := event.NewPair(enterEv)
- if prev, ok := e.enterEventState()[tid]; ok && prev != nil {
- prev.Recycle()
- }
- e.enterEventState()[tid] = pair
- e.enterEventAgeState()[tid] = e.nextCacheAge()
- e.prunePendingEnterEvents()
-}
-
-func (e *eventLoop) consumeEnterEvent(tid uint32) (*event.Pair, bool) {
- pair, ok := e.enterEventState()[tid]
- if !ok {
- return nil, false
- }
- delete(e.enterEventState(), tid)
- delete(e.enterEventAgeState(), tid)
- return pair, true
-}
-
-func (e *eventLoop) prunePendingEnterEvents() {
- state := e.enterEventState()
- limit := e.pendingEnterLimit()
- if len(state) <= limit {
- return
- }
- trimOldestPendingPairs(state, e.enterEventAgeState(), trimTarget(limit))
-}
-
-func trimOldestPendingPairs(state map[uint32]*event.Pair, ages map[uint32]uint64, targetSize int) {
- excess := len(state) - targetSize
- if excess <= 0 {
- return
- }
- type pendingPairAge struct {
- tid uint32
- age uint64
- }
- oldest := make([]pendingPairAge, 0, len(state))
- for tid := range state {
- age := ages[tid]
- oldest = append(oldest, pendingPairAge{tid: tid, age: age})
- }
- sort.Slice(oldest, func(i, j int) bool { return oldest[i].age < oldest[j].age })
- for _, entry := range oldest[:excess] {
- if pair, ok := state[entry.tid]; ok && pair != nil {
- pair.Recycle()
- }
- delete(state, entry.tid)
- delete(ages, entry.tid)
- }
-}
-
-func (e *eventLoop) pruneProcFdCache() {
- state := e.procFdCacheState()
- limit := e.procFdCacheLimit()
- if len(state) <= limit {
- return
- }
- trimOldestProcFdEntries(state, e.procFdCacheAgeState(), trimTarget(limit))
-}
-
-func trimOldestProcFdEntries(state map[uint64]*file.FdFile, ages map[uint64]uint64, targetSize int) {
- excess := len(state) - targetSize
- if excess <= 0 {
- return
- }
- type procFdAge struct {
- key uint64
- age uint64
- }
- oldest := make([]procFdAge, 0, len(state))
- for key := range state {
- age := ages[key]
- oldest = append(oldest, procFdAge{key: key, age: age})
- }
- sort.Slice(oldest, func(i, j int) bool { return oldest[i].age < oldest[j].age })
- for _, entry := range oldest[:excess] {
- delete(state, entry.key)
- delete(ages, entry.key)
- }
-}
-
-func (e *eventLoop) deleteProcFdCacheKey(key uint64) {
- delete(e.procFdCacheState(), key)
- delete(e.procFdCacheAgeState(), key)
-}
-
-func (e *eventLoop) nextCacheAge() uint64 {
- e.cacheAge++
- return e.cacheAge
-}
-
-func (e *eventLoop) pendingEnterLimit() int {
- if e.maxPendingEnterEvs > 0 {
- return e.maxPendingEnterEvs
- }
- return defaultMaxPendingEnterEvs
-}
-
-func (e *eventLoop) procFdCacheLimit() int {
- if e.maxProcFdCacheSize > 0 {
- return e.maxProcFdCacheSize
- }
- return defaultMaxProcFdCacheSize
-}
-
-func trimTarget(limit int) int {
- target := limit - limit/cacheTrimDivisor
- if target < 1 {
- return 1
- }
- return target
-}
-
-func procFdCacheKey(pid uint32, fd int32) uint64 {
- return uint64(pid)<<32 | uint64(uint32(fd))
-}
-
-func (e *eventLoop) comm(tid uint32) string {
- return e.commState().comm(tid)
-}
-
-func (e *eventLoop) cachedComm(tid uint32) (string, bool) {
- return e.commState().cached(tid)
-}
-
-func (e *eventLoop) setCachedComm(tid uint32, comm string) {
- e.commState().setCached(tid, comm)
-}
-
-func (e *eventLoop) queueCommLookup(tid uint32) {
- e.commState().queueLookup(tid)
-}
-
-func procTidPathPrefix(tid uint32) string {
- return "/proc/" + strconv.FormatUint(uint64(tid), 10)
-}
-
-func resolveCommFromProc(tid uint32) string {
- procPath := procTidPathPrefix(tid)
- if data, err := os.ReadFile(procPath + "/comm"); err == nil {
- comm := string(data)
- if len(comm) > 0 && comm[len(comm)-1] == '\n' {
- comm = comm[:len(comm)-1]
- }
- if comm != "" {
- return comm
- }
- }
- if linkName, err := os.Readlink(procPath + "/exe"); err == nil {
- linkName = filepath.Base(linkName)
- return linkName
- }
- return ""
-}
-
-// bytesFromRet extracts the number of bytes transferred from a RetEvent.
-// Returns 0 for nil events, errors (Ret <= 0), or unclassified syscalls.
-func bytesFromRet(retEv *types.RetEvent) uint64 {
- if retEv == nil || retEv.Ret <= 0 {
- return 0
- }
- switch retEv.RetType {
- case types.READ_CLASSIFIED, types.WRITE_CLASSIFIED, types.TRANSFER_CLASSIFIED:
- return uint64(retEv.Ret)
- default:
- return 0
- }
-}
diff --git a/internal/eventloop_comm.go b/internal/eventloop_comm.go
new file mode 100644
index 0000000..6b77023
--- /dev/null
+++ b/internal/eventloop_comm.go
@@ -0,0 +1,213 @@
+package internal
+
+import (
+ "os"
+ "path/filepath"
+ "strconv"
+ "sync"
+)
+
+type commResolver struct {
+ comms map[uint32]string
+
+ mu sync.RWMutex
+ pending map[uint32]struct{}
+ closed bool
+
+ lookupQueue chan uint32
+ lookupWorkers int
+ resolveFn func(uint32) string
+ startWorkersOnce sync.Once
+ workersWG sync.WaitGroup
+ shutdownOnce sync.Once
+}
+
+func newCommResolver(comms map[uint32]string) *commResolver {
+ if comms == nil {
+ comms = make(map[uint32]string)
+ }
+ r := &commResolver{
+ comms: comms,
+ pending: make(map[uint32]struct{}),
+ }
+ r.ensureLookupConfig()
+ return r
+}
+
+func (r *commResolver) ensureLookupConfig() {
+ if r.lookupWorkers <= 0 {
+ r.lookupWorkers = defaultCommLookupWorkers
+ }
+ if r.lookupQueue == nil {
+ r.lookupQueue = make(chan uint32, defaultCommLookupQueueSize)
+ }
+ if r.resolveFn == nil {
+ r.resolveFn = resolveCommFromProc
+ }
+}
+
+func (r *commResolver) startLookupWorkers() {
+ r.ensureLookupConfig()
+ r.mu.RLock()
+ closed := r.closed
+ r.mu.RUnlock()
+ if closed {
+ return
+ }
+ r.startWorkersOnce.Do(func() {
+ for i := 0; i < r.lookupWorkers; i++ {
+ r.workersWG.Add(1)
+ go r.lookupWorker()
+ }
+ })
+}
+
+func (r *commResolver) lookupWorker() {
+ defer r.workersWG.Done()
+ for tid := range r.lookupQueue {
+ comm := r.resolveFn(tid)
+ r.mu.Lock()
+ delete(r.pending, tid)
+ if comm != "" {
+ r.comms[tid] = comm
+ }
+ r.mu.Unlock()
+ }
+}
+
+func (r *commResolver) seedTrackedPidComm(pidFilter int) {
+ candidates := []uint32{uint32(os.Getpid()), uint32(os.Getppid())}
+ if pidFilter > 0 {
+ candidates = append(candidates, uint32(pidFilter))
+ }
+
+ seen := make(map[uint32]struct{}, len(candidates))
+ for _, tid := range candidates {
+ if tid == 0 {
+ continue
+ }
+ if _, ok := seen[tid]; ok {
+ continue
+ }
+ seen[tid] = struct{}{}
+ if comm := resolveCommFromProc(tid); comm != "" {
+ r.setCached(tid, comm)
+ continue
+ }
+ r.queueLookup(tid)
+ }
+}
+
+func (r *commResolver) comm(tid uint32) string {
+ if comm, ok := r.cached(tid); ok {
+ return comm
+ }
+ r.queueLookup(tid)
+ return ""
+}
+
+func (r *commResolver) cached(tid uint32) (string, bool) {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ comm, ok := r.comms[tid]
+ return comm, ok
+}
+
+func (r *commResolver) setCached(tid uint32, comm string) {
+ if comm == "" {
+ return
+ }
+ r.mu.Lock()
+ r.comms[tid] = comm
+ r.mu.Unlock()
+}
+
+func (r *commResolver) queueLookup(tid uint32) {
+ if tid == 0 {
+ return
+ }
+ r.startLookupWorkers()
+
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ if r.closed {
+ return
+ }
+ if _, ok := r.comms[tid]; ok {
+ return
+ }
+ if r.pending == nil {
+ r.pending = make(map[uint32]struct{})
+ }
+ if _, ok := r.pending[tid]; ok {
+ return
+ }
+ r.pending[tid] = struct{}{}
+
+ // Keep event processing non-blocking if resolver workers are saturated.
+ select {
+ case r.lookupQueue <- tid:
+ default:
+ delete(r.pending, tid)
+ }
+}
+
+func (r *commResolver) shutdown() {
+ r.shutdownOnce.Do(func() {
+ r.ensureLookupConfig()
+ r.mu.Lock()
+ r.closed = true
+ for tid := range r.pending {
+ delete(r.pending, tid)
+ }
+ queue := r.lookupQueue
+ r.mu.Unlock()
+ close(queue)
+ r.workersWG.Wait()
+ })
+}
+
+func (e *eventLoop) shutdownCommResolver() {
+ if e.commResolver == nil {
+ return
+ }
+ e.commResolver.shutdown()
+}
+
+func (e *eventLoop) comm(tid uint32) string {
+ return e.commState().comm(tid)
+}
+
+func (e *eventLoop) cachedComm(tid uint32) (string, bool) {
+ return e.commState().cached(tid)
+}
+
+func (e *eventLoop) setCachedComm(tid uint32, comm string) {
+ e.commState().setCached(tid, comm)
+}
+
+func (e *eventLoop) queueCommLookup(tid uint32) {
+ e.commState().queueLookup(tid)
+}
+
+func procTidPathPrefix(tid uint32) string {
+ return "/proc/" + strconv.FormatUint(uint64(tid), 10)
+}
+
+func resolveCommFromProc(tid uint32) string {
+ procPath := procTidPathPrefix(tid)
+ if data, err := os.ReadFile(procPath + "/comm"); err == nil {
+ comm := string(data)
+ if len(comm) > 0 && comm[len(comm)-1] == '\n' {
+ comm = comm[:len(comm)-1]
+ }
+ if comm != "" {
+ return comm
+ }
+ }
+ if linkName, err := os.Readlink(procPath + "/exe"); err == nil {
+ linkName = filepath.Base(linkName)
+ return linkName
+ }
+ return ""
+}
diff --git a/internal/eventloop_exit.go b/internal/eventloop_exit.go
new file mode 100644
index 0000000..7fe2469
--- /dev/null
+++ b/internal/eventloop_exit.go
@@ -0,0 +1,346 @@
+package internal
+
+import (
+ "fmt"
+ "os"
+ "reflect"
+ "syscall"
+
+ "ior/internal/event"
+ "ior/internal/file"
+ "ior/internal/types"
+)
+
+func (e *eventLoop) initExitHandlers() {
+ e.exitHandlers = map[reflect.Type]tracepointExitHandler{
+ typeKey[*types.OpenEvent](): newTypedExitHandler(e, "Dropped malformed open enter event", e.handleOpenExit),
+ typeKey[*types.NameEvent](): newTypedExitHandler(e, "Dropped malformed name enter event", e.handleNameExit),
+ typeKey[*types.PathEvent](): newTypedExitHandler(e, "Dropped malformed path enter event", e.handlePathExit),
+ typeKey[*types.FdEvent](): newTypedExitHandler(e, "Dropped malformed fd enter event", e.handleFdExit),
+ typeKey[*types.Dup3Event](): newTypedExitHandler(e, "Dropped malformed dup3 enter event", e.handleDup3Exit),
+ typeKey[*types.OpenByHandleAtEvent](): newTypedExitHandler(e, "Dropped malformed open_by_handle_at enter event", e.handleOpenByHandleAtExit),
+ typeKey[*types.NullEvent](): newTypedExitHandler(e, "Dropped malformed null enter event", e.handleNullExit),
+ typeKey[*types.FcntlEvent](): newTypedExitHandler(e, "Dropped malformed fcntl enter event", e.handleFcntlExit),
+ }
+}
+
+func mustBeType[T event.Event](e *eventLoop, ep *event.Pair, message string) (T, bool) {
+ enterEv, ok := ep.EnterEv.(T)
+ if !ok {
+ e.recyclePair(ep, message)
+ var zero T
+ return zero, false
+ }
+ return enterEv, true
+}
+
+func newTypedExitHandler[T event.Event](e *eventLoop, message string, handle func(*event.Pair, T) bool) tracepointExitHandler {
+ return func(ep *event.Pair) bool {
+ enterEv, ok := mustBeType[T](e, ep, message)
+ if !ok {
+ return false
+ }
+ return handle(ep, enterEv)
+ }
+}
+
+func typeKey[T any]() reflect.Type {
+ var zero T
+ return reflect.TypeOf(zero)
+}
+
+func (e *eventLoop) exitHandlerRegistry() map[reflect.Type]tracepointExitHandler {
+ if e.exitHandlers == nil {
+ e.initExitHandlers()
+ }
+ return e.exitHandlers
+}
+
+func (e *eventLoop) handleTracepointExit(ep *event.Pair) bool {
+ handler, ok := e.exitHandlerRegistry()[reflect.TypeOf(ep.EnterEv)]
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed enter event")
+ return false
+ }
+ return handler(ep)
+}
+
+func (e *eventLoop) handleOpenExit(ep *event.Pair, openEv *types.OpenEvent) bool {
+ retEvent, ok := ep.ExitEv.(*types.RetEvent)
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed open exit event")
+ return false
+ }
+
+ comm := types.StringValue(openEv.Comm[:])
+ ep.Comm = comm
+ if fd := int32(retEvent.Ret); fd >= 0 {
+ fdFile := file.NewFd(fd, types.StringValue(openEv.Filename[:]), openEv.Flags)
+ e.fdState().set(fd, fdFile)
+ ep.File = fdFile
+ } else {
+ // Keep path information for failed opens so error scenarios remain observable.
+ ep.File = file.NewPathname(openEv.Filename[:])
+ }
+ e.setCachedComm(openEv.Tid, comm)
+ return true
+}
+
+func (e *eventLoop) handleNameExit(ep *event.Pair, nameEv *types.NameEvent) bool {
+ ep.File = file.NewOldnameNewname(nameEv.Oldname[:], nameEv.Newname[:])
+ ep.Comm = e.comm(nameEv.GetTid())
+ return true
+}
+
+func (e *eventLoop) handlePathExit(ep *event.Pair, pathEv *types.PathEvent) bool {
+ if pathEv.GetTraceId().Name() == sysEnterNameToHandleAtName {
+ retEv, ok := ep.ExitEv.(*types.RetEvent)
+ if !ok || retEv.Ret < 0 {
+ ep.Recycle()
+ return false
+ }
+ e.pendingHandles[pathEv.GetTid()] = types.StringValue(pathEv.Pathname[:])
+ ep.Recycle()
+ return false
+ }
+
+ if ep.Is(types.SYS_ENTER_CREAT) {
+ retEvent, ok := ep.ExitEv.(*types.RetEvent)
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed creat exit event")
+ return false
+ }
+ if fd := int32(retEvent.Ret); fd >= 0 {
+ fdFile := file.NewFd(fd, types.StringValue(pathEv.Pathname[:]),
+ syscall.O_CREAT|syscall.O_WRONLY|syscall.O_TRUNC)
+ e.fdState().set(fd, fdFile)
+ ep.File = fdFile
+ }
+ } else {
+ ep.File = file.NewPathname(pathEv.Pathname[:])
+ }
+ ep.Comm = e.comm(pathEv.GetTid())
+ return true
+}
+
+func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *types.FdEvent) bool {
+ fd := fdEv.Fd
+ ep.File = e.resolveFdFile(fd, fdEv.Pid)
+ if ep.Is(types.SYS_ENTER_CLOSE) {
+ e.fdState().delete(fd)
+ e.deleteProcFdCache(fd, fdEv.Pid)
+ }
+ if ep.Is(types.SYS_ENTER_CLOSE_RANGE) {
+ // close_range provides (first, last), but fd_event only carries the first
+ // argument, so we approximate by closing all tracked fds >= first.
+ retEv, ok := ep.ExitEv.(*types.RetEvent)
+ if ok && retEv.Ret == 0 {
+ e.fdState().closeRangeFrom(fd)
+ e.deleteProcFdCacheFrom(fd, fdEv.Pid)
+ }
+ }
+ ep.Comm = e.comm(fdEv.GetTid())
+ if !e.filter.MatchPair(ep) {
+ ep.Recycle()
+ return false
+ }
+
+ if ep.Is(types.SYS_ENTER_DUP) || ep.Is(types.SYS_ENTER_DUP2) {
+ fdFile, ok := ep.File.(*file.FdFile)
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed dup source event")
+ return false
+ }
+ retEvent, ok := ep.ExitEv.(*types.RetEvent)
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed dup exit event")
+ return false
+ }
+ // Duplicating fd
+ e.registerDup(fdFile, int32(retEvent.Ret), 0)
+ }
+ if ep.Is(types.SYS_ENTER_PIDFD_GETFD) {
+ retEv, ok := ep.ExitEv.(*types.RetEvent)
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed pidfd_getfd exit event")
+ return false
+ }
+ if newFd := int32(retEv.Ret); newFd >= 0 {
+ transferredFile := file.NewFdWithPid(newFd, fdEv.Pid)
+ e.fdState().set(newFd, transferredFile)
+ ep.File = transferredFile
+ }
+ }
+ if retEv, ok := ep.ExitEv.(*types.RetEvent); ok {
+ ep.Bytes = bytesFromRet(retEv)
+ }
+ return true
+}
+
+func (e *eventLoop) handleDup3Exit(ep *event.Pair, dup3Ev *types.Dup3Event) bool {
+ fd := int32(dup3Ev.Fd)
+ ep.File = e.resolveFdFile(fd, dup3Ev.Pid)
+ ep.Comm = e.comm(dup3Ev.GetTid())
+ if !e.filter.MatchPair(ep) {
+ ep.Recycle()
+ return false
+ }
+
+ fdFile, ok := ep.File.(*file.FdFile)
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed dup3 source event")
+ return false
+ }
+ retEvent, ok := ep.ExitEv.(*types.RetEvent)
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed dup3 exit event")
+ return false
+ }
+ e.registerDup(fdFile, int32(retEvent.Ret), dup3Ev.Flags&syscall.O_CLOEXEC)
+ return true
+}
+
+func (e *eventLoop) handleOpenByHandleAtExit(ep *event.Pair, openByHandleEv *types.OpenByHandleAtEvent) bool {
+ tid := openByHandleEv.GetTid()
+ retEvent, ok := ep.ExitEv.(*types.RetEvent)
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed open_by_handle_at exit event")
+ return false
+ }
+
+ fd := int32(retEvent.Ret)
+ if fd < 0 {
+ ep.Recycle()
+ return false
+ }
+
+ if pathname, ok := e.pendingHandles[tid]; ok {
+ delete(e.pendingHandles, tid)
+ fdFile := file.NewFd(fd, pathname, openByHandleEv.Flags)
+ e.fdState().set(fd, fdFile)
+ ep.File = fdFile
+ } else {
+ fdFile := file.NewFdWithPid(fd, openByHandleEv.Pid)
+ if fdFile.Flags() == file.Flags(-1) {
+ fdFile.SetFlags(openByHandleEv.Flags)
+ }
+ e.fdState().set(fd, fdFile)
+ ep.File = fdFile
+ }
+ ep.Comm = e.comm(tid)
+ return true
+}
+
+func (e *eventLoop) handleNullExit(ep *event.Pair, nullEv *types.NullEvent) bool {
+ if ep.Is(types.SYS_ENTER_IO_URING_SETUP) {
+ retEvent, ok := ep.ExitEv.(*types.RetEvent)
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed io_uring_setup exit event")
+ return false
+ }
+ if fd := int32(retEvent.Ret); fd >= 0 {
+ fdFile := file.NewFdWithPid(fd, nullEv.Pid)
+ e.fdState().set(fd, fdFile)
+ ep.File = fdFile
+ }
+ }
+ if ep.Is(types.SYS_ENTER_GETCWD) {
+ retEvent, ok := ep.ExitEv.(*types.RetEvent)
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed getcwd exit event")
+ return false
+ }
+ if retEvent.Ret > 0 {
+ if cwd, err := os.Readlink(procTidPathPrefix(nullEv.GetTid()) + "/cwd"); err == nil {
+ ep.File = file.NewPathname([]byte(cwd))
+ }
+ }
+ }
+ ep.Comm = e.comm(nullEv.GetTid())
+ if !e.filter.MatchPair(ep) {
+ ep.Recycle()
+ return false
+ }
+ return true
+}
+
+func (e *eventLoop) handleFcntlExit(ep *event.Pair, fcntlEv *types.FcntlEvent) bool {
+ ep.Comm = e.comm(fcntlEv.GetTid())
+ fd := int32(fcntlEv.Fd)
+ ep.File = e.resolveFdFile(fd, fcntlEv.Pid)
+ if !e.filter.MatchPair(ep) {
+ ep.Recycle()
+ return false
+ }
+
+ retEvent, ok := ep.ExitEv.(*types.RetEvent)
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed fcntl exit event")
+ return false
+ }
+ // Syscall returned -1, nothing was changed with the fd
+ if retEvent.Ret == -1 {
+ return true
+ }
+
+ fdFile, ok := ep.File.(*file.FdFile)
+ if !ok {
+ e.recyclePair(ep, "Dropped malformed fcntl file event")
+ return false
+ }
+
+ // See fcntl(2) for implementation details
+ switch fcntlEv.Cmd {
+ case syscall.F_SETFL:
+ const canChange = syscall.O_APPEND | syscall.O_ASYNC | syscall.O_DIRECT | syscall.O_NOATIME | syscall.O_NONBLOCK
+ fdFile.SetFlags(int32(fcntlEv.Arg) & int32(canChange))
+ ep.File = fdFile
+ e.fdState().set(fd, fdFile)
+ case syscall.F_DUPFD:
+ e.registerDup(fdFile, int32(retEvent.Ret), 0)
+ case syscall.F_DUPFD_CLOEXEC:
+ e.registerDup(fdFile, int32(retEvent.Ret), syscall.O_CLOEXEC)
+ }
+ return true
+}
+
+func (e *eventLoop) registerDup(fdFile *file.FdFile, newFd int32, extraFlags int32) {
+ if newFd < 0 {
+ return
+ }
+ duppedFdFile := fdFile.Dup(newFd)
+ if extraFlags != 0 {
+ duppedFdFile.AddFlags(extraFlags)
+ }
+ e.fdState().set(newFd, duppedFdFile)
+}
+
+func (e *eventLoop) recyclePair(ep *event.Pair, warning string) {
+ e.notifyWarning(warning)
+ ep.Recycle()
+}
+
+func (e *eventLoop) notifyWarning(message string) {
+ if e.warningCb == nil || message == "" {
+ return
+ }
+ e.warningCb(message)
+}
+
+func (e *eventLoop) dropMalformedRawEvent(evType types.EventType, raw []byte) {
+ e.notifyWarning(fmt.Sprintf("Dropped malformed raw event type %d (len=%d)", evType, len(raw)))
+}
+
+// bytesFromRet extracts the number of bytes transferred from a RetEvent.
+// Returns 0 for nil events, errors (Ret <= 0), or unclassified syscalls.
+func bytesFromRet(retEv *types.RetEvent) uint64 {
+ if retEv == nil || retEv.Ret <= 0 {
+ return 0
+ }
+ switch retEv.RetType {
+ case types.READ_CLASSIFIED, types.WRITE_CLASSIFIED, types.TRANSFER_CLASSIFIED:
+ return uint64(retEv.Ret)
+ default:
+ return 0
+ }
+}
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go
new file mode 100644
index 0000000..12d9f12
--- /dev/null
+++ b/internal/eventloop_runtime.go
@@ -0,0 +1,271 @@
+package internal
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "ior/internal/event"
+ "ior/internal/file"
+ "ior/internal/types"
+)
+
+func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) {
+ defer close(e.done)
+ defer e.shutdownCommResolver()
+
+ if e.cfg.pprofEnable {
+ fmt.Println("Profiling, press Ctrl+C to stop")
+ }
+ if e.cfg.plainMode && !e.cfg.pprofEnable {
+ fmt.Println(event.EventStreamHeader)
+ }
+
+ e.startTime = time.Now()
+ if e.printCb == nil {
+ e.printCb = func(ep *event.Pair) { ep.Recycle() }
+ }
+ if e.cfg.synchronousRawProcessing {
+ e.runSynchronously(ctx, rawCh)
+ return
+ }
+ for ep := range e.events(ctx, rawCh) {
+ e.printCb(ep)
+ e.numSyscallsAfterFilter++
+ }
+}
+
+func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) {
+ pairs := make(chan *event.Pair, 1)
+
+ for {
+ select {
+ case raw, ok := <-rawCh:
+ if !ok {
+ return
+ }
+ if len(raw) == 0 {
+ continue
+ }
+ e.processRawEvent(raw, pairs)
+ for {
+ select {
+ case ep := <-pairs:
+ e.printCb(ep)
+ e.numSyscallsAfterFilter++
+ default:
+ goto nextRaw
+ }
+ }
+ case <-ctx.Done():
+ fmt.Println("Stopping event loop")
+ return
+ }
+ nextRaw:
+ }
+}
+
+func (e *eventLoop) events(ctx context.Context, rawCh <-chan []byte) <-chan *event.Pair {
+ ch := make(chan *event.Pair)
+
+ go func() {
+ defer close(ch)
+
+ for {
+ select {
+ case raw, ok := <-rawCh:
+ if !ok {
+ return
+ }
+ if len(raw) == 0 {
+ continue
+ }
+ e.processRawEvent(raw, ch)
+ case <-ctx.Done():
+ fmt.Println("Stopping event loop")
+ return
+ }
+ }
+ }()
+
+ return ch
+}
+
+func (e *eventLoop) processRawEvent(raw []byte, ch chan<- *event.Pair) {
+ e.numTracepoints++
+ e.initRawHandlers()
+ evType := types.EventType(raw[0])
+ handler, ok := e.rawHandlers[evType]
+ if !ok {
+ e.notifyWarning(fmt.Sprintf("Dropped unhandled raw event type %d", evType))
+ return
+ }
+ handler(raw, ch)
+}
+
+func (e *eventLoop) initRawHandlers() {
+ if e.rawHandlers == nil {
+ e.rawHandlers = make(map[types.EventType]rawEventHandler)
+ }
+ if len(e.rawHandlers) != 0 {
+ return
+ }
+
+ e.rawHandlers[types.ENTER_OPEN_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
+ openEv, ok := decodeRawEvent(e, types.ENTER_OPEN_EVENT, raw, types.NewOpenEventFast)
+ if !ok {
+ return
+ }
+ if e.filter.MatchOpenEvent(openEv) {
+ e.tracepointEntered(openEv)
+ }
+ }
+ e.rawHandlers[types.EXIT_OPEN_EVENT] = func(raw []byte, ch chan<- *event.Pair) {
+ retEv, ok := decodeRawEvent(e, types.EXIT_OPEN_EVENT, raw, types.NewRetEventFast)
+ if !ok {
+ return
+ }
+ e.tracepointExited(retEv, ch)
+ }
+ e.rawHandlers[types.ENTER_FD_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
+ fdEv, ok := decodeRawEvent(e, types.ENTER_FD_EVENT, raw, types.NewFdEventFast)
+ if !ok {
+ return
+ }
+ e.tracepointEntered(fdEv)
+ }
+ e.rawHandlers[types.EXIT_FD_EVENT] = func(raw []byte, ch chan<- *event.Pair) {
+ fdEv, ok := decodeRawEvent(e, types.EXIT_FD_EVENT, raw, types.NewFdEventFast)
+ if !ok {
+ return
+ }
+ e.tracepointExited(fdEv, ch)
+ }
+ e.rawHandlers[types.ENTER_NULL_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
+ nullEv, ok := decodeRawEvent(e, types.ENTER_NULL_EVENT, raw, types.NewNullEventFast)
+ if !ok {
+ return
+ }
+ e.tracepointEntered(nullEv)
+ }
+ e.rawHandlers[types.EXIT_NULL_EVENT] = func(raw []byte, ch chan<- *event.Pair) {
+ nullEv, ok := decodeRawEvent(e, types.EXIT_NULL_EVENT, raw, types.NewNullEventFast)
+ if !ok {
+ return
+ }
+ e.tracepointExited(nullEv, ch)
+ }
+ e.rawHandlers[types.EXIT_RET_EVENT] = func(raw []byte, ch chan<- *event.Pair) {
+ retEv, ok := decodeRawEvent(e, types.EXIT_RET_EVENT, raw, types.NewRetEventFast)
+ if !ok {
+ return
+ }
+ e.tracepointExited(retEv, ch)
+ }
+ e.rawHandlers[types.ENTER_NAME_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
+ nameEv, ok := decodeRawEvent(e, types.ENTER_NAME_EVENT, raw, types.NewNameEventFast)
+ if !ok {
+ return
+ }
+ if e.filter.MatchNameEvent(nameEv) {
+ e.tracepointEntered(nameEv)
+ }
+ }
+ e.rawHandlers[types.ENTER_PATH_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
+ pathEv, ok := decodeRawEvent(e, types.ENTER_PATH_EVENT, raw, types.NewPathEventFast)
+ if !ok {
+ return
+ }
+ if e.filter.MatchPathEvent(pathEv) {
+ e.tracepointEntered(pathEv)
+ }
+ }
+ e.rawHandlers[types.ENTER_FCNTL_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
+ fcntlEv, ok := decodeRawEvent(e, types.ENTER_FCNTL_EVENT, raw, types.NewFcntlEventFast)
+ if !ok {
+ return
+ }
+ e.tracepointEntered(fcntlEv)
+ }
+ e.rawHandlers[types.ENTER_OPEN_BY_HANDLE_AT_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
+ openByHandleEv, ok := decodeRawEvent(e, types.ENTER_OPEN_BY_HANDLE_AT_EVENT, raw, types.NewOpenByHandleAtEventFast)
+ if !ok {
+ return
+ }
+ e.tracepointEntered(openByHandleEv)
+ }
+ e.rawHandlers[types.ENTER_DUP3_EVENT] = func(raw []byte, _ chan<- *event.Pair) {
+ dup3Ev, ok := decodeRawEvent(e, types.ENTER_DUP3_EVENT, raw, types.NewDup3EventFast)
+ if !ok {
+ return
+ }
+ e.tracepointEntered(dup3Ev)
+ }
+}
+
+func decodeRawEvent[T any](e *eventLoop, eventType types.EventType, raw []byte, decode func([]byte) *T) (*T, bool) {
+ decoded := decode(raw)
+ if decoded == nil {
+ e.dropMalformedRawEvent(eventType, raw)
+ return nil, false
+ }
+ return decoded, true
+}
+
+func (e *eventLoop) tracepointEntered(enterEv event.Event) {
+ tid := enterEv.GetTid()
+ // Schedule comm lookup as early as possible to reduce races for short-lived processes.
+ e.queueCommLookup(tid)
+ if !e.filter.UsesCommFilter() {
+ e.setEnterEvent(enterEv)
+ return
+ }
+
+ switch enterEv.(type) {
+ case *types.OpenEvent:
+ e.setEnterEvent(enterEv)
+ default:
+ // Only, when we have a comm name
+ if _, ok := e.cachedComm(tid); ok {
+ e.setEnterEvent(enterEv)
+ } else {
+ e.notifyWarning(fmt.Sprintf("No comm name for %v process probably already vanished?", enterEv))
+ }
+ }
+}
+
+func (e *eventLoop) tracepointExited(exitEv event.Event, ch chan<- *event.Pair) {
+ ep, ok := e.consumeEnterEvent(exitEv.GetTid())
+ if !ok {
+ exitEv.Recycle()
+ return
+ }
+ ep.ExitEv = exitEv
+ e.numSyscalls++
+
+ // Expect ID one lower, otherwise, enter and exit tracepoints
+ // don't match up. E.g.:
+ // enterEv:SYS_ENTER_OPEN => exitEv:SYS_EXIT_OPEN
+ if ep.EnterEv.GetTraceId()-1 != ep.ExitEv.GetTraceId() {
+ e.numTracepointMismatches++
+ e.notifyWarning("Dropped tracepoint pair with mismatched enter/exit IDs")
+ ep.Recycle()
+ return
+ }
+ if !e.handleTracepointExit(ep) {
+ return
+ }
+ prevPairTime, _ := e.prevPairTimes[ep.EnterEv.GetTid()]
+ ep.CalculateDurations(prevPairTime)
+ e.prevPairTimes[ep.EnterEv.GetTid()] = ep.ExitEv.GetTime()
+ e.freezePairForEmission(ep)
+ ch <- ep
+}
+
+func (e *eventLoop) freezePairForEmission(ep *event.Pair) {
+ fdFile, ok := ep.File.(*file.FdFile)
+ if !ok {
+ return
+ }
+ ep.File = fdFile.Dup(fdFile.FD())
+}
diff --git a/internal/eventloop_state.go b/internal/eventloop_state.go
new file mode 100644
index 0000000..cd6e428
--- /dev/null
+++ b/internal/eventloop_state.go
@@ -0,0 +1,237 @@
+package internal
+
+import (
+ "sort"
+
+ "ior/internal/event"
+ "ior/internal/file"
+)
+
+type fdTracker struct {
+ files map[int32]file.File
+}
+
+func newFDTracker(files map[int32]file.File) *fdTracker {
+ if files == nil {
+ files = make(map[int32]file.File)
+ }
+ return &fdTracker{files: files}
+}
+
+func (t *fdTracker) get(fd int32) (file.File, bool) {
+ f, ok := t.files[fd]
+ return f, ok
+}
+
+func (t *fdTracker) set(fd int32, f file.File) {
+ t.files[fd] = f
+}
+
+func (t *fdTracker) delete(fd int32) {
+ delete(t.files, fd)
+}
+
+func (t *fdTracker) closeRangeFrom(first int32) {
+ for fd := range t.files {
+ if fd >= first {
+ delete(t.files, fd)
+ }
+ }
+}
+
+func (e *eventLoop) resolveFdFile(fd int32, pid uint32) file.File {
+ if fdFile, ok := e.fdState().get(fd); ok {
+ return fdFile
+ }
+ if fd < 0 {
+ return file.NewFd(fd, "", -1)
+ }
+
+ if cached, ok := e.cachedProcFdFile(fd, pid); ok {
+ return cached
+ }
+
+ // Cache first procfs resolution to avoid repeated /proc lookups for hot unknown FDs.
+ discovered := file.NewFdWithPid(fd, pid)
+ e.setProcFdCache(fd, pid, discovered)
+ return discovered
+}
+
+func (e *eventLoop) cachedProcFdFile(fd int32, pid uint32) (*file.FdFile, bool) {
+ key := procFdCacheKey(pid, fd)
+ cache, ok := e.procFdCacheState()[key]
+ if ok {
+ e.procFdCacheAgeState()[key] = e.nextCacheAge()
+ }
+ return cache, ok
+}
+
+func (e *eventLoop) setProcFdCache(fd int32, pid uint32, resolved *file.FdFile) {
+ key := procFdCacheKey(pid, fd)
+ e.procFdCacheState()[key] = resolved
+ e.procFdCacheAgeState()[key] = e.nextCacheAge()
+ e.pruneProcFdCache()
+}
+
+func (e *eventLoop) deleteProcFdCache(fd int32, pid uint32) {
+ e.deleteProcFdCacheKey(procFdCacheKey(pid, fd))
+}
+
+func (e *eventLoop) deleteProcFdCacheFrom(first int32, pid uint32) {
+ cache := e.procFdCacheState()
+ for key := range cache {
+ cachePid := uint32(key >> 32)
+ cacheFd := int32(uint32(key))
+ if cachePid == pid && cacheFd >= first {
+ e.deleteProcFdCacheKey(key)
+ }
+ }
+}
+
+func (e *eventLoop) procFdCacheState() map[uint64]*file.FdFile {
+ if e.procFdCache == nil {
+ e.procFdCache = make(map[uint64]*file.FdFile)
+ }
+ return e.procFdCache
+}
+
+func (e *eventLoop) procFdCacheAgeState() map[uint64]uint64 {
+ if e.procFdCacheAges == nil {
+ e.procFdCacheAges = make(map[uint64]uint64)
+ }
+ return e.procFdCacheAges
+}
+
+func (e *eventLoop) enterEventAgeState() map[uint32]uint64 {
+ if e.enterEvAges == nil {
+ e.enterEvAges = make(map[uint32]uint64)
+ }
+ return e.enterEvAges
+}
+
+func (e *eventLoop) enterEventState() map[uint32]*event.Pair {
+ if e.enterEvs == nil {
+ e.enterEvs = make(map[uint32]*event.Pair)
+ }
+ return e.enterEvs
+}
+
+func (e *eventLoop) setEnterEvent(enterEv event.Event) {
+ tid := enterEv.GetTid()
+ pair := event.NewPair(enterEv)
+ if prev, ok := e.enterEventState()[tid]; ok && prev != nil {
+ prev.Recycle()
+ }
+ e.enterEventState()[tid] = pair
+ e.enterEventAgeState()[tid] = e.nextCacheAge()
+ e.prunePendingEnterEvents()
+}
+
+func (e *eventLoop) consumeEnterEvent(tid uint32) (*event.Pair, bool) {
+ pair, ok := e.enterEventState()[tid]
+ if !ok {
+ return nil, false
+ }
+ delete(e.enterEventState(), tid)
+ delete(e.enterEventAgeState(), tid)
+ return pair, true
+}
+
+func (e *eventLoop) prunePendingEnterEvents() {
+ state := e.enterEventState()
+ limit := e.pendingEnterLimit()
+ if len(state) <= limit {
+ return
+ }
+ trimOldestPendingPairs(state, e.enterEventAgeState(), trimTarget(limit))
+}
+
+func trimOldestPendingPairs(state map[uint32]*event.Pair, ages map[uint32]uint64, targetSize int) {
+ excess := len(state) - targetSize
+ if excess <= 0 {
+ return
+ }
+ type pendingPairAge struct {
+ tid uint32
+ age uint64
+ }
+ oldest := make([]pendingPairAge, 0, len(state))
+ for tid := range state {
+ age := ages[tid]
+ oldest = append(oldest, pendingPairAge{tid: tid, age: age})
+ }
+ sort.Slice(oldest, func(i, j int) bool { return oldest[i].age < oldest[j].age })
+ for _, entry := range oldest[:excess] {
+ if pair, ok := state[entry.tid]; ok && pair != nil {
+ pair.Recycle()
+ }
+ delete(state, entry.tid)
+ delete(ages, entry.tid)
+ }
+}
+
+func (e *eventLoop) pruneProcFdCache() {
+ state := e.procFdCacheState()
+ limit := e.procFdCacheLimit()
+ if len(state) <= limit {
+ return
+ }
+ trimOldestProcFdEntries(state, e.procFdCacheAgeState(), trimTarget(limit))
+}
+
+func trimOldestProcFdEntries(state map[uint64]*file.FdFile, ages map[uint64]uint64, targetSize int) {
+ excess := len(state) - targetSize
+ if excess <= 0 {
+ return
+ }
+ type procFdAge struct {
+ key uint64
+ age uint64
+ }
+ oldest := make([]procFdAge, 0, len(state))
+ for key := range state {
+ age := ages[key]
+ oldest = append(oldest, procFdAge{key: key, age: age})
+ }
+ sort.Slice(oldest, func(i, j int) bool { return oldest[i].age < oldest[j].age })
+ for _, entry := range oldest[:excess] {
+ delete(state, entry.key)
+ delete(ages, entry.key)
+ }
+}
+
+func (e *eventLoop) deleteProcFdCacheKey(key uint64) {
+ delete(e.procFdCacheState(), key)
+ delete(e.procFdCacheAgeState(), key)
+}
+
+func (e *eventLoop) nextCacheAge() uint64 {
+ e.cacheAge++
+ return e.cacheAge
+}
+
+func (e *eventLoop) pendingEnterLimit() int {
+ if e.maxPendingEnterEvs > 0 {
+ return e.maxPendingEnterEvs
+ }
+ return defaultMaxPendingEnterEvs
+}
+
+func (e *eventLoop) procFdCacheLimit() int {
+ if e.maxProcFdCacheSize > 0 {
+ return e.maxProcFdCacheSize
+ }
+ return defaultMaxProcFdCacheSize
+}
+
+func trimTarget(limit int) int {
+ target := limit - limit/cacheTrimDivisor
+ if target < 1 {
+ return 1
+ }
+ return target
+}
+
+func procFdCacheKey(pid uint32, fd int32) uint64 {
+ return uint64(pid)<<32 | uint64(uint32(fd))
+}