From 1731c3723ced92a5dc8e54fb0caf4e33b2c7ba70 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 18 Mar 2026 09:04:17 +0200 Subject: refactor: extract pairTracker and extend fdTracker to reduce eventLoop responsibilities (task 428) The eventLoop struct held 20+ fields across 5+ responsibilities (SRP violation). Extract two cohesive sub-structs: - pairTracker: enter/exit pair matching, age-based LRU pruning, and DurationToPrev tracking. Replaces enterEvs/enterEvAges/prevPairTimes/ maxPendingEnterEvs/cacheAge fields with a single embedded value. - fdTracker (extended): absorbs procFdCache/procFdAges/maxProcFdCacheSize, moving all procfs-resolution cache logic (resolve, cache, prune, delete) off eventLoop and onto the tracker that already owns the fd table. eventLoop drops from 20 fields to 12. All methods that previously reached into eventLoop fields now live on the struct that owns the data. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- internal/bench_components_test.go | 6 +- internal/eventloop.go | 47 +++---- internal/eventloop_cleanup_test.go | 42 +++--- internal/eventloop_error_handling_test.go | 4 +- internal/eventloop_exit.go | 10 +- internal/eventloop_filter_test.go | 35 +++-- internal/eventloop_runtime.go | 43 +++--- internal/eventloop_state.go | 217 +++++++++++++++++------------- internal/eventloop_test.go | 18 +-- 9 files changed, 225 insertions(+), 197 deletions(-) (limited to 'internal') diff --git a/internal/bench_components_test.go b/internal/bench_components_test.go index 715aabc..c7b724e 100644 --- a/internal/bench_components_test.go +++ b/internal/bench_components_test.go @@ -130,8 +130,8 @@ func BenchmarkTracepointEntered(b *testing.B) { for i := 0; i < b.N; i++ { enterEv := types.NewOpenEvent(raw) el.tracepointEntered(enterEv) - if ep, ok := el.enterEvs[componentBenchTID]; ok { - delete(el.enterEvs, componentBenchTID) + if ep, ok := el.pairs.enters[componentBenchTID]; ok { + delete(el.pairs.enters, componentBenchTID) // tracepointEntered stores only EnterEv; provide a placeholder so Pair.Recycle can return to the pool. ep.ExitEv = &types.NullEvent{} ep.Recycle() @@ -157,7 +157,7 @@ func BenchmarkTracepointExited(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { enterEv := types.NewNullEvent(enterRaw) - el.enterEvs[componentBenchTID] = event.NewPair(enterEv) + el.pairs.enters[componentBenchTID] = event.NewPair(enterEv) exitEv := types.NewNullEvent(exitRaw) el.tracepointExited(exitEv, out) (<-out).Recycle() diff --git a/internal/eventloop.go b/internal/eventloop.go index b7fe230..645f6af 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -37,22 +37,15 @@ type eventLoopConfig struct { type rawEventHandler func(raw []byte, ch chan<- *event.Pair) type eventLoop struct { - filter globalfilter.Filter - enterEvs map[uint32]*event.Pair // Temp. store of sys_enter tracepoints per Tid. - enterEvAges map[uint32]uint64 - pendingHandles map[uint32]string // map of TID to pathname from name_to_handle_at - fdTracker *fdTracker - procFdCache map[uint64]*file.FdFile // Cache procfs-resolved metadata for unknown fds. - procFdCacheAges map[uint64]uint64 - commResolver *commResolver - prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events) - rawHandlers map[types.EventType]rawEventHandler - printCb func(ep *event.Pair) // Callback to print the event - warningCb func(message string) // Optional callback for non-fatal event processing warnings - cfg eventLoopConfig - cacheAge uint64 - maxPendingEnterEvs int - maxProcFdCacheSize int + filter globalfilter.Filter + pairs pairTracker // enter/exit pairing state and inter-syscall duration tracking + pendingHandles map[uint32]string // TID → pathname from name_to_handle_at, for open_by_handle_at correlation + fdTracker *fdTracker // fd table and procfs resolution cache + commResolver *commResolver + rawHandlers map[types.EventType]rawEventHandler + printCb func(ep *event.Pair) // Callback to print the event + warningCb func(message string) // Optional callback for non-fatal event processing warnings + cfg eventLoopConfig // Statistics numTracepoints uint @@ -71,19 +64,15 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) { } el := &eventLoop{ - filter: cfg.filter.Clone(), - enterEvs: make(map[uint32]*event.Pair), - enterEvAges: make(map[uint32]uint64), - pendingHandles: make(map[uint32]string), - fdTracker: fdState, - procFdCache: make(map[uint64]*file.FdFile), - procFdCacheAges: make(map[uint64]uint64), - commResolver: commState, - prevPairTimes: make(map[uint32]uint64), - rawHandlers: make(map[types.EventType]rawEventHandler), - printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, - cfg: cfg, - done: make(chan struct{}), + filter: cfg.filter.Clone(), + pairs: newPairTracker(), + pendingHandles: make(map[uint32]string), + fdTracker: fdState, + commResolver: commState, + rawHandlers: make(map[types.EventType]rawEventHandler), + printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, + cfg: cfg, + done: make(chan struct{}), } el.initRawHandlers() el.configureOutputCallback() diff --git a/internal/eventloop_cleanup_test.go b/internal/eventloop_cleanup_test.go index f76bfe7..1d3a6fb 100644 --- a/internal/eventloop_cleanup_test.go +++ b/internal/eventloop_cleanup_test.go @@ -8,8 +8,8 @@ import ( func TestTracepointEnteredPrunesOldestPendingPairs(t *testing.T) { el := &eventLoop{ - commResolver: newCommResolver(make(map[uint32]string)), - maxPendingEnterEvs: 2, + commResolver: newCommResolver(make(map[uint32]string)), + pairs: pairTracker{maxSize: 2}, } enterOne, _ := makeEnterOpenEvent(t, defaulTime, defaultPid, defaultTid) @@ -20,20 +20,20 @@ func TestTracepointEnteredPrunesOldestPendingPairs(t *testing.T) { el.tracepointEntered(&enterTwo) el.tracepointEntered(&enterThree) - if _, ok := el.enterEvs[defaultTid]; ok { + if _, ok := el.pairs.enters[defaultTid]; ok { t.Fatalf("expected oldest pending enter event to be evicted") } - if _, ok := el.enterEvs[defaultTid+1]; !ok { + if _, ok := el.pairs.enters[defaultTid+1]; !ok { t.Fatalf("expected newer pending enter event to be retained") } - if _, ok := el.enterEvs[defaultTid+2]; !ok { + if _, ok := el.pairs.enters[defaultTid+2]; !ok { t.Fatalf("expected newest pending enter event to be retained") } - if got := len(el.enterEvAges); got != 2 { + if got := len(el.pairs.enterAges); got != 2 { t.Fatalf("pending enter metadata size = %d, want 2", got) } - for _, pair := range el.enterEvs { + for _, pair := range el.pairs.enters { pair.Recycle() } } @@ -42,9 +42,9 @@ func TestConsumeEnterEventClearsPendingPairMetadata(t *testing.T) { el := &eventLoop{} enterOne, _ := makeEnterOpenEvent(t, defaulTime, defaultPid, defaultTid) - el.setEnterEvent(&enterOne) + el.pairs.set(&enterOne) - pair, ok := el.consumeEnterEvent(defaultTid) + pair, ok := el.pairs.consume(defaultTid) if !ok { t.Fatalf("expected pending enter event to be consumed") } @@ -53,36 +53,38 @@ func TestConsumeEnterEventClearsPendingPairMetadata(t *testing.T) { } pair.Recycle() - if _, ok := el.enterEvs[defaultTid]; ok { + if _, ok := el.pairs.enters[defaultTid]; ok { t.Fatalf("expected pending enter pair to be removed") } - if _, ok := el.enterEvAges[defaultTid]; ok { + if _, ok := el.pairs.enterAges[defaultTid]; ok { t.Fatalf("expected pending enter metadata to be removed") } } func TestProcFdCacheRetainsRecentlyUsedEntries(t *testing.T) { - el := &eventLoop{maxProcFdCacheSize: 2} + fdt := newFDTracker(nil) + fdt.maxCacheSize = 2 + el := &eventLoop{fdTracker: fdt} - el.setProcFdCache(10, defaultPid, file.NewFdWithPid(10, defaultPid)) - el.setProcFdCache(11, defaultPid, file.NewFdWithPid(11, defaultPid)) + el.fdTracker.setProcFdCache(10, defaultPid, file.NewFdWithPid(10, defaultPid)) + el.fdTracker.setProcFdCache(11, defaultPid, file.NewFdWithPid(11, defaultPid)) - if _, ok := el.cachedProcFdFile(10, defaultPid); !ok { + if _, ok := el.fdTracker.cachedProcFdFile(10, defaultPid); !ok { t.Fatalf("expected first cache entry to exist before refresh") } - el.setProcFdCache(12, defaultPid, file.NewFdWithPid(12, defaultPid)) + el.fdTracker.setProcFdCache(12, defaultPid, file.NewFdWithPid(12, defaultPid)) - if _, ok := el.cachedProcFdFile(10, defaultPid); !ok { + if _, ok := el.fdTracker.cachedProcFdFile(10, defaultPid); !ok { t.Fatalf("expected recently used cache entry to be retained") } - if _, ok := el.cachedProcFdFile(11, defaultPid); ok { + if _, ok := el.fdTracker.cachedProcFdFile(11, defaultPid); ok { t.Fatalf("expected least recently used cache entry to be evicted") } - if _, ok := el.cachedProcFdFile(12, defaultPid); !ok { + if _, ok := el.fdTracker.cachedProcFdFile(12, defaultPid); !ok { t.Fatalf("expected newest cache entry to be retained") } - if got := len(el.procFdCacheAges); got != 2 { + if got := len(el.fdTracker.procFdAges); got != 2 { t.Fatalf("proc fd cache metadata size = %d, want 2", got) } } diff --git a/internal/eventloop_error_handling_test.go b/internal/eventloop_error_handling_test.go index 5867417..0851ff6 100644 --- a/internal/eventloop_error_handling_test.go +++ b/internal/eventloop_error_handling_test.go @@ -43,7 +43,7 @@ func TestTracepointExitedMalformedOpenExitDoesNotPanicAndNotifies(t *testing.T) t.Fatalf("expected warning notification") } - if _, ok := el.enterEvs[enterEv.Tid]; ok { + if _, ok := el.pairs.enters[enterEv.Tid]; ok { t.Fatalf("expected enter event to be removed for tid %d", enterEv.Tid) } } @@ -229,7 +229,7 @@ func TestTracepointEnteredMissingCommWithCommFilterNotifies(t *testing.T) { t.Fatalf("expected warning notification") } - if _, ok := el.enterEvs[defaultTid]; ok { + if _, ok := el.pairs.enters[defaultTid]; ok { t.Fatalf("expected no enter event to be stored for tid %d", defaultTid) } } diff --git a/internal/eventloop_exit.go b/internal/eventloop_exit.go index e40a3fd..a9dd4c5 100644 --- a/internal/eventloop_exit.go +++ b/internal/eventloop_exit.go @@ -96,10 +96,10 @@ func (e *eventLoop) handlePathExit(ep *event.Pair, pathEv *types.PathEvent) bool func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *types.FdEvent) bool { fd := fdEv.Fd - ep.File = e.resolveFdFile(fd, fdEv.Pid) + ep.File = e.fdState().resolve(fd, fdEv.Pid) if ep.Is(types.SYS_ENTER_CLOSE) { e.fdState().delete(fd) - e.deleteProcFdCache(fd, fdEv.Pid) + e.fdState().deleteProcFdCache(fd, fdEv.Pid) } if ep.Is(types.SYS_ENTER_CLOSE_RANGE) { // close_range provides (first, last), but fd_event only carries the first @@ -107,7 +107,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *types.FdEvent) bool { retEv, ok := ep.ExitEv.(*types.RetEvent) if ok && retEv.Ret == 0 { e.fdState().closeRangeFrom(fd) - e.deleteProcFdCacheFrom(fd, fdEv.Pid) + e.fdState().deleteProcFdCacheFrom(fd, fdEv.Pid) } } ep.Comm = e.comm(fdEv.GetTid()) @@ -150,7 +150,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *types.FdEvent) bool { func (e *eventLoop) handleDup3Exit(ep *event.Pair, dup3Ev *types.Dup3Event) bool { fd := int32(dup3Ev.Fd) - ep.File = e.resolveFdFile(fd, dup3Ev.Pid) + ep.File = e.fdState().resolve(fd, dup3Ev.Pid) ep.Comm = e.comm(dup3Ev.GetTid()) if !e.filter.MatchPair(ep) { ep.Recycle() @@ -242,7 +242,7 @@ func (e *eventLoop) handleNullExit(ep *event.Pair, nullEv *types.NullEvent) bool func (e *eventLoop) handleFcntlExit(ep *event.Pair, fcntlEv *types.FcntlEvent) bool { ep.Comm = e.comm(fcntlEv.GetTid()) fd := int32(fcntlEv.Fd) - ep.File = e.resolveFdFile(fd, fcntlEv.Pid) + ep.File = e.fdState().resolve(fd, fcntlEv.Pid) if !e.filter.MatchPair(ep) { ep.Recycle() return false diff --git a/internal/eventloop_filter_test.go b/internal/eventloop_filter_test.go index ec8e75b..4e45060 100644 --- a/internal/eventloop_filter_test.go +++ b/internal/eventloop_filter_test.go @@ -450,12 +450,11 @@ func TestCommFilterToggle(t *testing.T) { // Create eventloop without comm filter el := &eventLoop{ - filter: globalfilter.Filter{}, - enterEvs: make(map[uint32]*event.Pair), - fdTracker: newFDTracker(make(map[int32]file.File)), - commResolver: newCommResolver(make(map[uint32]string)), - prevPairTimes: make(map[uint32]uint64), - cfg: eventLoopConfig{synchronousRawProcessing: true}, + filter: globalfilter.Filter{}, + pairs: newPairTracker(), + fdTracker: newFDTracker(make(map[int32]file.File)), + commResolver: newCommResolver(make(map[uint32]string)), + cfg: eventLoopConfig{synchronousRawProcessing: true}, printCb: func(ep *event.Pair) { next := synchronizedPair{pair: ep, ack: make(chan struct{})} outCh <- next @@ -495,11 +494,10 @@ func TestCommFilterToggle(t *testing.T) { filter: globalfilter.Filter{ Comm: &globalfilter.StringFilter{Pattern: "test"}, }, - enterEvs: make(map[uint32]*event.Pair), - fdTracker: newFDTracker(make(map[int32]file.File)), - commResolver: newCommResolver(make(map[uint32]string)), - prevPairTimes: make(map[uint32]uint64), - cfg: eventLoopConfig{synchronousRawProcessing: true}, + pairs: newPairTracker(), + fdTracker: newFDTracker(make(map[int32]file.File)), + commResolver: newCommResolver(make(map[uint32]string)), + cfg: eventLoopConfig{synchronousRawProcessing: true}, printCb: func(ep *event.Pair) { next := synchronizedPair{pair: ep, ack: make(chan struct{})} outCh <- next @@ -529,14 +527,13 @@ func TestCommFilterToggle(t *testing.T) { func newEventLoopWithFilter(commFilter, pathFilter string) *eventLoop { el := &eventLoop{ - filter: testFilter(commFilter, pathFilter), - enterEvs: make(map[uint32]*event.Pair), - fdTracker: newFDTracker(make(map[int32]file.File)), - commResolver: newCommResolver(make(map[uint32]string)), - prevPairTimes: make(map[uint32]uint64), - cfg: eventLoopConfig{synchronousRawProcessing: true}, - printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, - done: make(chan struct{}), + filter: testFilter(commFilter, pathFilter), + pairs: newPairTracker(), + fdTracker: newFDTracker(make(map[int32]file.File)), + commResolver: newCommResolver(make(map[uint32]string)), + cfg: eventLoopConfig{synchronousRawProcessing: true}, + printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, + done: make(chan struct{}), } return el } diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go index 12d9f12..7f540ec 100644 --- a/internal/eventloop_runtime.go +++ b/internal/eventloop_runtime.go @@ -25,6 +25,7 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { if e.printCb == nil { e.printCb = func(ep *event.Pair) { ep.Recycle() } } + e.initRawHandlers() if e.cfg.synchronousRawProcessing { e.runSynchronously(ctx, rawCh) return @@ -48,20 +49,24 @@ func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) { continue } e.processRawEvent(raw, pairs) - for { - select { - case ep := <-pairs: - e.printCb(ep) - e.numSyscallsAfterFilter++ - default: - goto nextRaw - } - } + e.drainPairs(pairs) case <-ctx.Done(): fmt.Println("Stopping event loop") return } - nextRaw: + } +} + +// drainPairs consumes all immediately available pairs from the buffered channel. +func (e *eventLoop) drainPairs(pairs <-chan *event.Pair) { + for { + select { + case ep := <-pairs: + e.printCb(ep) + e.numSyscallsAfterFilter++ + default: + return + } } } @@ -92,8 +97,10 @@ func (e *eventLoop) events(ctx context.Context, rawCh <-chan []byte) <-chan *eve } func (e *eventLoop) processRawEvent(raw []byte, ch chan<- *event.Pair) { + if len(raw) == 0 { + return + } e.numTracepoints++ - e.initRawHandlers() evType := types.EventType(raw[0]) handler, ok := e.rawHandlers[evType] if !ok { @@ -217,17 +224,17 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) { // Schedule comm lookup as early as possible to reduce races for short-lived processes. e.queueCommLookup(tid) if !e.filter.UsesCommFilter() { - e.setEnterEvent(enterEv) + e.pairs.set(enterEv) return } switch enterEv.(type) { case *types.OpenEvent: - e.setEnterEvent(enterEv) + e.pairs.set(enterEv) default: // Only, when we have a comm name if _, ok := e.cachedComm(tid); ok { - e.setEnterEvent(enterEv) + e.pairs.set(enterEv) } else { e.notifyWarning(fmt.Sprintf("No comm name for %v process probably already vanished?", enterEv)) } @@ -235,7 +242,7 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) { } func (e *eventLoop) tracepointExited(exitEv event.Event, ch chan<- *event.Pair) { - ep, ok := e.consumeEnterEvent(exitEv.GetTid()) + ep, ok := e.pairs.consume(exitEv.GetTid()) if !ok { exitEv.Recycle() return @@ -255,9 +262,9 @@ func (e *eventLoop) tracepointExited(exitEv event.Event, ch chan<- *event.Pair) if !e.handleTracepointExit(ep) { return } - prevPairTime, _ := e.prevPairTimes[ep.EnterEv.GetTid()] - ep.CalculateDurations(prevPairTime) - e.prevPairTimes[ep.EnterEv.GetTid()] = ep.ExitEv.GetTime() + tid := ep.EnterEv.GetTid() + ep.CalculateDurations(e.pairs.prevTime(tid)) + e.pairs.setPrevTime(tid, ep.ExitEv.GetTime()) e.freezePairForEmission(ep) ch <- ep } diff --git a/internal/eventloop_state.go b/internal/eventloop_state.go index cd6e428..9622fd1 100644 --- a/internal/eventloop_state.go +++ b/internal/eventloop_state.go @@ -1,21 +1,32 @@ package internal import ( - "sort" + "cmp" + "slices" "ior/internal/event" "ior/internal/file" ) +// fdTracker holds the process's open file-descriptor table and a procfs +// resolution cache for fds that were opened before tracing started. type fdTracker struct { - files map[int32]file.File + files map[int32]file.File + procFdCache map[uint64]*file.FdFile // procfs-resolved metadata for unknown FDs + procFdAges map[uint64]uint64 // access age per cache entry, for LRU eviction + maxCacheSize int // max entries before eviction; 0 = defaultMaxProcFdCacheSize + age uint64 // monotonic counter for LRU ordering } func newFDTracker(files map[int32]file.File) *fdTracker { if files == nil { files = make(map[int32]file.File) } - return &fdTracker{files: files} + return &fdTracker{ + files: files, + procFdCache: make(map[uint64]*file.FdFile), + procFdAges: make(map[uint64]uint64), + } } func (t *fdTracker) get(fd int32) (file.File, bool) { @@ -39,111 +50,168 @@ func (t *fdTracker) closeRangeFrom(first int32) { } } -func (e *eventLoop) resolveFdFile(fd int32, pid uint32) file.File { - if fdFile, ok := e.fdState().get(fd); ok { +// resolve returns the file.File for fd, checking the fd table first, then the +// procfs cache, and finally resolving via procfs and caching the result. +func (t *fdTracker) resolve(fd int32, pid uint32) file.File { + if fdFile, ok := t.get(fd); ok { return fdFile } if fd < 0 { return file.NewFd(fd, "", -1) } - - if cached, ok := e.cachedProcFdFile(fd, pid); ok { + if cached, ok := t.cachedProcFdFile(fd, pid); ok { return cached } - // Cache first procfs resolution to avoid repeated /proc lookups for hot unknown FDs. discovered := file.NewFdWithPid(fd, pid) - e.setProcFdCache(fd, pid, discovered) + t.setProcFdCache(fd, pid, discovered) return discovered } -func (e *eventLoop) cachedProcFdFile(fd int32, pid uint32) (*file.FdFile, bool) { +func (t *fdTracker) cachedProcFdFile(fd int32, pid uint32) (*file.FdFile, bool) { + if t.procFdCache == nil { + return nil, false + } key := procFdCacheKey(pid, fd) - cache, ok := e.procFdCacheState()[key] + cache, ok := t.procFdCache[key] if ok { - e.procFdCacheAgeState()[key] = e.nextCacheAge() + t.age++ + t.procFdAges[key] = t.age } return cache, ok } -func (e *eventLoop) setProcFdCache(fd int32, pid uint32, resolved *file.FdFile) { +func (t *fdTracker) setProcFdCache(fd int32, pid uint32, resolved *file.FdFile) { + if t.procFdCache == nil { + t.procFdCache = make(map[uint64]*file.FdFile) + t.procFdAges = make(map[uint64]uint64) + } key := procFdCacheKey(pid, fd) - e.procFdCacheState()[key] = resolved - e.procFdCacheAgeState()[key] = e.nextCacheAge() - e.pruneProcFdCache() + t.age++ + t.procFdCache[key] = resolved + t.procFdAges[key] = t.age + t.pruneCache() } -func (e *eventLoop) deleteProcFdCache(fd int32, pid uint32) { - e.deleteProcFdCacheKey(procFdCacheKey(pid, fd)) +func (t *fdTracker) deleteProcFdCache(fd int32, pid uint32) { + t.deleteCacheKey(procFdCacheKey(pid, fd)) } -func (e *eventLoop) deleteProcFdCacheFrom(first int32, pid uint32) { - cache := e.procFdCacheState() - for key := range cache { +func (t *fdTracker) deleteProcFdCacheFrom(first int32, pid uint32) { + if t.procFdCache == nil { + return + } + for key := range t.procFdCache { cachePid := uint32(key >> 32) cacheFd := int32(uint32(key)) if cachePid == pid && cacheFd >= first { - e.deleteProcFdCacheKey(key) + t.deleteCacheKey(key) } } } -func (e *eventLoop) procFdCacheState() map[uint64]*file.FdFile { - if e.procFdCache == nil { - e.procFdCache = make(map[uint64]*file.FdFile) +func (t *fdTracker) pruneCache() { + if t.procFdCache == nil { + return + } + limit := t.cacheLimit() + if len(t.procFdCache) <= limit { + return } - return e.procFdCache + trimOldestProcFdEntries(t.procFdCache, t.procFdAges, trimTarget(limit)) } -func (e *eventLoop) procFdCacheAgeState() map[uint64]uint64 { - if e.procFdCacheAges == nil { - e.procFdCacheAges = make(map[uint64]uint64) +func (t *fdTracker) cacheLimit() int { + if t.maxCacheSize > 0 { + return t.maxCacheSize } - return e.procFdCacheAges + return defaultMaxProcFdCacheSize } -func (e *eventLoop) enterEventAgeState() map[uint32]uint64 { - if e.enterEvAges == nil { - e.enterEvAges = make(map[uint32]uint64) - } - return e.enterEvAges +// deleteCacheKey removes a cache entry by its composite key. +// delete on a nil map is a no-op in Go, so this is safe even before any cache entries are set. +func (t *fdTracker) deleteCacheKey(key uint64) { + delete(t.procFdCache, key) + delete(t.procFdAges, key) } -func (e *eventLoop) enterEventState() map[uint32]*event.Pair { - if e.enterEvs == nil { - e.enterEvs = make(map[uint32]*event.Pair) +// pairTracker holds the state for matching sys_enter events to their sys_exit +// counterparts and computing inter-syscall durations per TID. +type pairTracker struct { + enters map[uint32]*event.Pair // pending enter events, keyed by TID + enterAges map[uint32]uint64 // insertion order per TID, for LRU eviction + prevTimes map[uint32]uint64 // previous pair's exit time per TID, for DurationToPrev + maxSize int // max pending enter events before pruning; 0 = default + age uint64 // monotonic counter for LRU ordering +} + +func newPairTracker() pairTracker { + return pairTracker{ + enters: make(map[uint32]*event.Pair), + enterAges: make(map[uint32]uint64), + prevTimes: make(map[uint32]uint64), } - return e.enterEvs } -func (e *eventLoop) setEnterEvent(enterEv event.Event) { +// set stores enterEv as a pending enter event for its TID, recycling any +// prior unmatched enter for the same TID, then prunes if over the limit. +// Maps are initialized lazily on first write; consume is safe on a nil map because +// Go map reads on nil return the zero value. +func (p *pairTracker) set(enterEv event.Event) { + if p.enters == nil { + p.enters = make(map[uint32]*event.Pair) + p.enterAges = make(map[uint32]uint64) + p.prevTimes = make(map[uint32]uint64) + } tid := enterEv.GetTid() pair := event.NewPair(enterEv) - if prev, ok := e.enterEventState()[tid]; ok && prev != nil { + if prev, ok := p.enters[tid]; ok && prev != nil { prev.Recycle() } - e.enterEventState()[tid] = pair - e.enterEventAgeState()[tid] = e.nextCacheAge() - e.prunePendingEnterEvents() + p.age++ + p.enters[tid] = pair + p.enterAges[tid] = p.age + p.prune() } -func (e *eventLoop) consumeEnterEvent(tid uint32) (*event.Pair, bool) { - pair, ok := e.enterEventState()[tid] +// consume removes and returns the pending enter pair for tid. +// Reading a nil map returns the zero value in Go, so this is safe before any set call. +func (p *pairTracker) consume(tid uint32) (*event.Pair, bool) { + pair, ok := p.enters[tid] if !ok { return nil, false } - delete(e.enterEventState(), tid) - delete(e.enterEventAgeState(), tid) + delete(p.enters, tid) + delete(p.enterAges, tid) return pair, true } -func (e *eventLoop) prunePendingEnterEvents() { - state := e.enterEventState() - limit := e.pendingEnterLimit() - if len(state) <= limit { +// prevTime returns the exit time of the previous pair for tid, used to compute DurationToPrev. +func (p *pairTracker) prevTime(tid uint32) uint64 { + return p.prevTimes[tid] +} + +// setPrevTime records the exit time of the most recent completed pair for tid. +func (p *pairTracker) setPrevTime(tid uint32, t uint64) { + if p.prevTimes == nil { + p.prevTimes = make(map[uint32]uint64) + } + p.prevTimes[tid] = t +} + +func (p *pairTracker) prune() { + limit := p.limit() + if len(p.enters) <= limit { return } - trimOldestPendingPairs(state, e.enterEventAgeState(), trimTarget(limit)) + trimOldestPendingPairs(p.enters, p.enterAges, trimTarget(limit)) +} + +func (p *pairTracker) limit() int { + if p.maxSize > 0 { + return p.maxSize + } + return defaultMaxPendingEnterEvs } func trimOldestPendingPairs(state map[uint32]*event.Pair, ages map[uint32]uint64, targetSize int) { @@ -157,10 +225,9 @@ func trimOldestPendingPairs(state map[uint32]*event.Pair, ages map[uint32]uint64 } oldest := make([]pendingPairAge, 0, len(state)) for tid := range state { - age := ages[tid] - oldest = append(oldest, pendingPairAge{tid: tid, age: age}) + oldest = append(oldest, pendingPairAge{tid: tid, age: ages[tid]}) } - sort.Slice(oldest, func(i, j int) bool { return oldest[i].age < oldest[j].age }) + slices.SortFunc(oldest, func(a, b pendingPairAge) int { return cmp.Compare(a.age, b.age) }) for _, entry := range oldest[:excess] { if pair, ok := state[entry.tid]; ok && pair != nil { pair.Recycle() @@ -170,15 +237,6 @@ func trimOldestPendingPairs(state map[uint32]*event.Pair, ages map[uint32]uint64 } } -func (e *eventLoop) pruneProcFdCache() { - state := e.procFdCacheState() - limit := e.procFdCacheLimit() - if len(state) <= limit { - return - } - trimOldestProcFdEntries(state, e.procFdCacheAgeState(), trimTarget(limit)) -} - func trimOldestProcFdEntries(state map[uint64]*file.FdFile, ages map[uint64]uint64, targetSize int) { excess := len(state) - targetSize if excess <= 0 { @@ -190,40 +248,15 @@ func trimOldestProcFdEntries(state map[uint64]*file.FdFile, ages map[uint64]uint } oldest := make([]procFdAge, 0, len(state)) for key := range state { - age := ages[key] - oldest = append(oldest, procFdAge{key: key, age: age}) + oldest = append(oldest, procFdAge{key: key, age: ages[key]}) } - sort.Slice(oldest, func(i, j int) bool { return oldest[i].age < oldest[j].age }) + slices.SortFunc(oldest, func(a, b procFdAge) int { return cmp.Compare(a.age, b.age) }) for _, entry := range oldest[:excess] { delete(state, entry.key) delete(ages, entry.key) } } -func (e *eventLoop) deleteProcFdCacheKey(key uint64) { - delete(e.procFdCacheState(), key) - delete(e.procFdCacheAgeState(), key) -} - -func (e *eventLoop) nextCacheAge() uint64 { - e.cacheAge++ - return e.cacheAge -} - -func (e *eventLoop) pendingEnterLimit() int { - if e.maxPendingEnterEvs > 0 { - return e.maxPendingEnterEvs - } - return defaultMaxPendingEnterEvs -} - -func (e *eventLoop) procFdCacheLimit() int { - if e.maxProcFdCacheSize > 0 { - return e.maxProcFdCacheSize - } - return defaultMaxProcFdCacheSize -} - func trimTarget(limit int) int { target := limit - limit/cacheTrimDivisor if target < 1 { diff --git a/internal/eventloop_test.go b/internal/eventloop_test.go index b9e1c89..c5aebe4 100644 --- a/internal/eventloop_test.go +++ b/internal/eventloop_test.go @@ -163,7 +163,7 @@ func TestHandleFdExitCloseClearsProcFdCache(t *testing.T) { pid := uint32(1001) fd := int32(55) - el.setProcFdCache(fd, pid, file.NewFd(fd, "stale", syscall.O_RDONLY)) + el.fdState().setProcFdCache(fd, pid, file.NewFd(fd, "stale", syscall.O_RDONLY)) verifyProcFdCached(t, el, pid, fd) enter := &types.FdEvent{ @@ -190,10 +190,10 @@ func TestHandleFdExitCloseRangeClearsProcFdCacheRange(t *testing.T) { el := mustNewEventLoop(t, eventLoopConfig{}) pid := uint32(2002) - el.setProcFdCache(10, pid, file.NewFd(10, "keep", syscall.O_RDONLY)) - el.setProcFdCache(20, pid, file.NewFd(20, "drop", syscall.O_RDONLY)) - el.setProcFdCache(30, pid, file.NewFd(30, "drop", syscall.O_RDONLY)) - el.setProcFdCache(20, pid+1, file.NewFd(20, "other-pid", syscall.O_RDONLY)) + el.fdState().setProcFdCache(10, pid, file.NewFd(10, "keep", syscall.O_RDONLY)) + el.fdState().setProcFdCache(20, pid, file.NewFd(20, "drop", syscall.O_RDONLY)) + el.fdState().setProcFdCache(30, pid, file.NewFd(30, "drop", syscall.O_RDONLY)) + el.fdState().setProcFdCache(20, pid+1, file.NewFd(20, "other-pid", syscall.O_RDONLY)) enter := &types.FdEvent{ TraceId: types.SYS_ENTER_CLOSE_RANGE, @@ -1720,13 +1720,13 @@ func verifyFdNotTracked(t *testing.T, el *eventLoop, fd int32) { } func verifyProcFdCached(t *testing.T, el *eventLoop, pid uint32, fd int32) { - if _, ok := el.cachedProcFdFile(fd, pid); !ok { + if _, ok := el.fdState().cachedProcFdFile(fd, pid); !ok { t.Errorf("Expected proc fd cache to contain pid=%d fd=%d", pid, fd) } } func verifyProcFdNotCached(t *testing.T, el *eventLoop, pid uint32, fd int32) { - if _, ok := el.cachedProcFdFile(fd, pid); ok { + if _, ok := el.fdState().cachedProcFdFile(fd, pid); ok { t.Errorf("Expected proc fd cache to not contain pid=%d fd=%d", pid, fd) } } @@ -1742,13 +1742,13 @@ func verifyNoEventOutput(t *testing.T, outCh <-chan *event.Pair, timeout time.Du } func verifyEnterEventPending(t *testing.T, el *eventLoop, tid uint32) { - if _, ok := el.enterEvs[tid]; !ok { + if _, ok := el.pairs.enters[tid]; !ok { t.Errorf("Expected enter event for tid %d to be pending but it wasn't found", tid) } } func verifyNoEnterEventPending(t *testing.T, el *eventLoop, tid uint32) { - if _, ok := el.enterEvs[tid]; ok { + if _, ok := el.pairs.enters[tid]; ok { t.Errorf("Expected no enter event for tid %d but one was found", tid) } } -- cgit v1.2.3