diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-10 22:34:17 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-10 22:34:17 +0200 |
| commit | 80d8993dca4cf7945c492406489fb9d966e2dc44 (patch) | |
| tree | 51bb8eda2a60c17cf04294370d095b6f86d0f035 | |
| parent | d1aeb4afa9afee8a0cce8827b4c2dd9f8c01fe5b (diff) | |
eventloop: bound pending enter and proc-fd caches (task 425)
| -rw-r--r-- | internal/event/pair.go | 10 | ||||
| -rw-r--r-- | internal/event/pair_test.go | 10 | ||||
| -rw-r--r-- | internal/eventloop.go | 220 | ||||
| -rw-r--r-- | internal/eventloop_cleanup_test.go | 88 |
4 files changed, 293 insertions, 35 deletions
diff --git a/internal/event/pair.go b/internal/event/pair.go index 131c6b3..3eb8a16 100644 --- a/internal/event/pair.go +++ b/internal/event/pair.go @@ -120,8 +120,14 @@ func (e *Pair) Dump() string { } func (e *Pair) Recycle() { - e.EnterEv.Recycle() - e.ExitEv.Recycle() + if e.EnterEv != nil { + e.EnterEv.Recycle() + } + if e.ExitEv != nil { + e.ExitEv.Recycle() + } + e.EnterEv = nil + e.ExitEv = nil e.File = nil e.Comm = "" e.Duration = 0 diff --git a/internal/event/pair_test.go b/internal/event/pair_test.go index 43e9945..eb033dc 100644 --- a/internal/event/pair_test.go +++ b/internal/event/pair_test.go @@ -55,3 +55,13 @@ func TestPairCalculateDurationsWithPreviousExit(t *testing.T) { t.Fatalf("DurationToPrev = %d, want 500", pair.DurationToPrev) } } + +func TestPairRecycleHandlesMissingExitEvent(t *testing.T) { + pair := NewPair(&types.OpenEvent{ + Time: 1000, + Pid: 1, + Tid: 2, + }) + + pair.Recycle() +} diff --git a/internal/eventloop.go b/internal/eventloop.go index 6eba594..27b48e7 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "reflect" + "sort" "sync" "syscall" "time" @@ -23,6 +24,9 @@ const sysEnterNameToHandleAtName = "name_to_handle_at" const ( defaultCommLookupWorkers = 4 defaultCommLookupQueueSize = 512 + defaultMaxPendingEnterEvs = 16384 + defaultMaxProcFdCacheSize = 8192 + cacheTrimDivisor = 4 ) type eventLoopConfig struct { @@ -241,18 +245,23 @@ type rawEventHandler func(raw []byte, ch chan<- *event.Pair) type tracepointExitHandler func(ep *event.Pair) bool type eventLoop struct { - filter globalfilter.Filter - enterEvs map[uint32]*event.Pair // Temp. store of sys_enter tracepoints per Tid. - pendingHandles map[uint32]string // map of TID to pathname from name_to_handle_at - fdTracker *fdTracker - procFdCache map[uint64]*file.FdFile // Cache procfs-resolved metadata for unknown fds. - commResolver *commResolver - prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events) - rawHandlers map[types.EventType]rawEventHandler - exitHandlers map[reflect.Type]tracepointExitHandler - printCb func(ep *event.Pair) // Callback to print the event - warningCb func(message string) // Optional callback for non-fatal event processing warnings - cfg eventLoopConfig + filter globalfilter.Filter + enterEvs map[uint32]*event.Pair // Temp. store of sys_enter tracepoints per Tid. + enterEvAges map[uint32]uint64 + pendingHandles map[uint32]string // map of TID to pathname from name_to_handle_at + fdTracker *fdTracker + procFdCache map[uint64]*file.FdFile // Cache procfs-resolved metadata for unknown fds. + procFdCacheAges map[uint64]uint64 + commResolver *commResolver + prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events) + rawHandlers map[types.EventType]rawEventHandler + exitHandlers map[reflect.Type]tracepointExitHandler + printCb func(ep *event.Pair) // Callback to print the event + warningCb func(message string) // Optional callback for non-fatal event processing warnings + cfg eventLoopConfig + cacheAge uint64 + maxPendingEnterEvs int + maxProcFdCacheSize int // Statistics numTracepoints uint @@ -271,18 +280,20 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) { } el := &eventLoop{ - filter: cfg.filter.Clone(), - enterEvs: make(map[uint32]*event.Pair), - pendingHandles: make(map[uint32]string), - fdTracker: fdState, - procFdCache: make(map[uint64]*file.FdFile), - commResolver: commState, - prevPairTimes: make(map[uint32]uint64), - rawHandlers: make(map[types.EventType]rawEventHandler), - exitHandlers: make(map[reflect.Type]tracepointExitHandler), - printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, - cfg: cfg, - done: make(chan struct{}), + filter: cfg.filter.Clone(), + enterEvs: make(map[uint32]*event.Pair), + enterEvAges: make(map[uint32]uint64), + pendingHandles: make(map[uint32]string), + fdTracker: fdState, + procFdCache: make(map[uint64]*file.FdFile), + procFdCacheAges: make(map[uint64]uint64), + commResolver: commState, + prevPairTimes: make(map[uint32]uint64), + rawHandlers: make(map[types.EventType]rawEventHandler), + exitHandlers: make(map[reflect.Type]tracepointExitHandler), + printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, + cfg: cfg, + done: make(chan struct{}), } el.initRawHandlers() el.initExitHandlers() @@ -582,17 +593,17 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) { // Schedule comm lookup as early as possible to reduce races for short-lived processes. e.queueCommLookup(tid) if !e.filter.UsesCommFilter() { - e.enterEvs[tid] = event.NewPair(enterEv) + e.setEnterEvent(enterEv) return } switch enterEv.(type) { case *types.OpenEvent: - e.enterEvs[tid] = event.NewPair(enterEv) + e.setEnterEvent(enterEv) default: // Only, when we have a comm name if _, ok := e.cachedComm(tid); ok { - e.enterEvs[tid] = event.NewPair(enterEv) + e.setEnterEvent(enterEv) } else { e.notifyWarning(fmt.Sprintf("No comm name for %v process probably already vanished?", enterEv)) } @@ -600,12 +611,11 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) { } func (e *eventLoop) tracepointExited(exitEv event.Event, ch chan<- *event.Pair) { - ep, ok := e.enterEvs[exitEv.GetTid()] + ep, ok := e.consumeEnterEvent(exitEv.GetTid()) if !ok { exitEv.Recycle() return } - delete(e.enterEvs, exitEv.GetTid()) ep.ExitEv = exitEv e.numSyscalls++ @@ -1006,16 +1016,23 @@ func (e *eventLoop) resolveFdFile(fd int32, pid uint32) file.File { } func (e *eventLoop) cachedProcFdFile(fd int32, pid uint32) (*file.FdFile, bool) { - cache, ok := e.procFdCacheState()[procFdCacheKey(pid, fd)] + key := procFdCacheKey(pid, fd) + cache, ok := e.procFdCacheState()[key] + if ok { + e.procFdCacheAgeState()[key] = e.nextCacheAge() + } return cache, ok } func (e *eventLoop) setProcFdCache(fd int32, pid uint32, resolved *file.FdFile) { - e.procFdCacheState()[procFdCacheKey(pid, fd)] = resolved + key := procFdCacheKey(pid, fd) + e.procFdCacheState()[key] = resolved + e.procFdCacheAgeState()[key] = e.nextCacheAge() + e.pruneProcFdCache() } func (e *eventLoop) deleteProcFdCache(fd int32, pid uint32) { - delete(e.procFdCacheState(), procFdCacheKey(pid, fd)) + e.deleteProcFdCacheKey(procFdCacheKey(pid, fd)) } func (e *eventLoop) deleteProcFdCacheFrom(first int32, pid uint32) { @@ -1024,7 +1041,7 @@ func (e *eventLoop) deleteProcFdCacheFrom(first int32, pid uint32) { cachePid := uint32(key >> 32) cacheFd := int32(uint32(key)) if cachePid == pid && cacheFd >= first { - delete(cache, key) + e.deleteProcFdCacheKey(key) } } } @@ -1036,6 +1053,143 @@ func (e *eventLoop) procFdCacheState() map[uint64]*file.FdFile { return e.procFdCache } +func (e *eventLoop) procFdCacheAgeState() map[uint64]uint64 { + if e.procFdCacheAges == nil { + e.procFdCacheAges = make(map[uint64]uint64) + } + return e.procFdCacheAges +} + +func (e *eventLoop) enterEventAgeState() map[uint32]uint64 { + if e.enterEvAges == nil { + e.enterEvAges = make(map[uint32]uint64) + } + return e.enterEvAges +} + +func (e *eventLoop) enterEventState() map[uint32]*event.Pair { + if e.enterEvs == nil { + e.enterEvs = make(map[uint32]*event.Pair) + } + return e.enterEvs +} + +func (e *eventLoop) setEnterEvent(enterEv event.Event) { + tid := enterEv.GetTid() + pair := event.NewPair(enterEv) + if prev, ok := e.enterEventState()[tid]; ok && prev != nil { + prev.Recycle() + } + e.enterEventState()[tid] = pair + e.enterEventAgeState()[tid] = e.nextCacheAge() + e.prunePendingEnterEvents() +} + +func (e *eventLoop) consumeEnterEvent(tid uint32) (*event.Pair, bool) { + pair, ok := e.enterEventState()[tid] + if !ok { + return nil, false + } + delete(e.enterEventState(), tid) + delete(e.enterEventAgeState(), tid) + return pair, true +} + +func (e *eventLoop) prunePendingEnterEvents() { + state := e.enterEventState() + limit := e.pendingEnterLimit() + if len(state) <= limit { + return + } + trimOldestPendingPairs(state, e.enterEventAgeState(), trimTarget(limit)) +} + +func trimOldestPendingPairs(state map[uint32]*event.Pair, ages map[uint32]uint64, targetSize int) { + excess := len(state) - targetSize + if excess <= 0 { + return + } + type pendingPairAge struct { + tid uint32 + age uint64 + } + oldest := make([]pendingPairAge, 0, len(state)) + for tid := range state { + age := ages[tid] + oldest = append(oldest, pendingPairAge{tid: tid, age: age}) + } + sort.Slice(oldest, func(i, j int) bool { return oldest[i].age < oldest[j].age }) + for _, entry := range oldest[:excess] { + if pair, ok := state[entry.tid]; ok && pair != nil { + pair.Recycle() + } + delete(state, entry.tid) + delete(ages, entry.tid) + } +} + +func (e *eventLoop) pruneProcFdCache() { + state := e.procFdCacheState() + limit := e.procFdCacheLimit() + if len(state) <= limit { + return + } + trimOldestProcFdEntries(state, e.procFdCacheAgeState(), trimTarget(limit)) +} + +func trimOldestProcFdEntries(state map[uint64]*file.FdFile, ages map[uint64]uint64, targetSize int) { + excess := len(state) - targetSize + if excess <= 0 { + return + } + type procFdAge struct { + key uint64 + age uint64 + } + oldest := make([]procFdAge, 0, len(state)) + for key := range state { + age := ages[key] + oldest = append(oldest, procFdAge{key: key, age: age}) + } + sort.Slice(oldest, func(i, j int) bool { return oldest[i].age < oldest[j].age }) + for _, entry := range oldest[:excess] { + delete(state, entry.key) + delete(ages, entry.key) + } +} + +func (e *eventLoop) deleteProcFdCacheKey(key uint64) { + delete(e.procFdCacheState(), key) + delete(e.procFdCacheAgeState(), key) +} + +func (e *eventLoop) nextCacheAge() uint64 { + e.cacheAge++ + return e.cacheAge +} + +func (e *eventLoop) pendingEnterLimit() int { + if e.maxPendingEnterEvs > 0 { + return e.maxPendingEnterEvs + } + return defaultMaxPendingEnterEvs +} + +func (e *eventLoop) procFdCacheLimit() int { + if e.maxProcFdCacheSize > 0 { + return e.maxProcFdCacheSize + } + return defaultMaxProcFdCacheSize +} + +func trimTarget(limit int) int { + target := limit - limit/cacheTrimDivisor + if target < 1 { + return 1 + } + return target +} + func procFdCacheKey(pid uint32, fd int32) uint64 { return uint64(pid)<<32 | uint64(uint32(fd)) } diff --git a/internal/eventloop_cleanup_test.go b/internal/eventloop_cleanup_test.go new file mode 100644 index 0000000..f76bfe7 --- /dev/null +++ b/internal/eventloop_cleanup_test.go @@ -0,0 +1,88 @@ +package internal + +import ( + "testing" + + "ior/internal/file" +) + +func TestTracepointEnteredPrunesOldestPendingPairs(t *testing.T) { + el := &eventLoop{ + commResolver: newCommResolver(make(map[uint32]string)), + maxPendingEnterEvs: 2, + } + + enterOne, _ := makeEnterOpenEvent(t, defaulTime, defaultPid, defaultTid) + enterTwo, _ := makeEnterOpenEvent(t, defaulTime+1, defaultPid, defaultTid+1) + enterThree, _ := makeEnterOpenEvent(t, defaulTime+2, defaultPid, defaultTid+2) + + el.tracepointEntered(&enterOne) + el.tracepointEntered(&enterTwo) + el.tracepointEntered(&enterThree) + + if _, ok := el.enterEvs[defaultTid]; ok { + t.Fatalf("expected oldest pending enter event to be evicted") + } + if _, ok := el.enterEvs[defaultTid+1]; !ok { + t.Fatalf("expected newer pending enter event to be retained") + } + if _, ok := el.enterEvs[defaultTid+2]; !ok { + t.Fatalf("expected newest pending enter event to be retained") + } + if got := len(el.enterEvAges); got != 2 { + t.Fatalf("pending enter metadata size = %d, want 2", got) + } + + for _, pair := range el.enterEvs { + pair.Recycle() + } +} + +func TestConsumeEnterEventClearsPendingPairMetadata(t *testing.T) { + el := &eventLoop{} + + enterOne, _ := makeEnterOpenEvent(t, defaulTime, defaultPid, defaultTid) + el.setEnterEvent(&enterOne) + + pair, ok := el.consumeEnterEvent(defaultTid) + if !ok { + t.Fatalf("expected pending enter event to be consumed") + } + if pair == nil { + t.Fatalf("expected consumed pair") + } + pair.Recycle() + + if _, ok := el.enterEvs[defaultTid]; ok { + t.Fatalf("expected pending enter pair to be removed") + } + if _, ok := el.enterEvAges[defaultTid]; ok { + t.Fatalf("expected pending enter metadata to be removed") + } +} + +func TestProcFdCacheRetainsRecentlyUsedEntries(t *testing.T) { + el := &eventLoop{maxProcFdCacheSize: 2} + + el.setProcFdCache(10, defaultPid, file.NewFdWithPid(10, defaultPid)) + el.setProcFdCache(11, defaultPid, file.NewFdWithPid(11, defaultPid)) + + if _, ok := el.cachedProcFdFile(10, defaultPid); !ok { + t.Fatalf("expected first cache entry to exist before refresh") + } + + el.setProcFdCache(12, defaultPid, file.NewFdWithPid(12, defaultPid)) + + if _, ok := el.cachedProcFdFile(10, defaultPid); !ok { + t.Fatalf("expected recently used cache entry to be retained") + } + if _, ok := el.cachedProcFdFile(11, defaultPid); ok { + t.Fatalf("expected least recently used cache entry to be evicted") + } + if _, ok := el.cachedProcFdFile(12, defaultPid); !ok { + t.Fatalf("expected newest cache entry to be retained") + } + if got := len(el.procFdCacheAges); got != 2 { + t.Fatalf("proc fd cache metadata size = %d, want 2", got) + } +} |
