summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-10 22:34:17 +0200
committerPaul Buetow <paul@buetow.org>2026-03-10 22:34:17 +0200
commit80d8993dca4cf7945c492406489fb9d966e2dc44 (patch)
tree51bb8eda2a60c17cf04294370d095b6f86d0f035
parentd1aeb4afa9afee8a0cce8827b4c2dd9f8c01fe5b (diff)
eventloop: bound pending enter and proc-fd caches (task 425)
-rw-r--r--internal/event/pair.go10
-rw-r--r--internal/event/pair_test.go10
-rw-r--r--internal/eventloop.go220
-rw-r--r--internal/eventloop_cleanup_test.go88
4 files changed, 293 insertions, 35 deletions
diff --git a/internal/event/pair.go b/internal/event/pair.go
index 131c6b3..3eb8a16 100644
--- a/internal/event/pair.go
+++ b/internal/event/pair.go
@@ -120,8 +120,14 @@ func (e *Pair) Dump() string {
}
func (e *Pair) Recycle() {
- e.EnterEv.Recycle()
- e.ExitEv.Recycle()
+ if e.EnterEv != nil {
+ e.EnterEv.Recycle()
+ }
+ if e.ExitEv != nil {
+ e.ExitEv.Recycle()
+ }
+ e.EnterEv = nil
+ e.ExitEv = nil
e.File = nil
e.Comm = ""
e.Duration = 0
diff --git a/internal/event/pair_test.go b/internal/event/pair_test.go
index 43e9945..eb033dc 100644
--- a/internal/event/pair_test.go
+++ b/internal/event/pair_test.go
@@ -55,3 +55,13 @@ func TestPairCalculateDurationsWithPreviousExit(t *testing.T) {
t.Fatalf("DurationToPrev = %d, want 500", pair.DurationToPrev)
}
}
+
+func TestPairRecycleHandlesMissingExitEvent(t *testing.T) {
+ pair := NewPair(&types.OpenEvent{
+ Time: 1000,
+ Pid: 1,
+ Tid: 2,
+ })
+
+ pair.Recycle()
+}
diff --git a/internal/eventloop.go b/internal/eventloop.go
index 6eba594..27b48e7 100644
--- a/internal/eventloop.go
+++ b/internal/eventloop.go
@@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"reflect"
+ "sort"
"sync"
"syscall"
"time"
@@ -23,6 +24,9 @@ const sysEnterNameToHandleAtName = "name_to_handle_at"
const (
defaultCommLookupWorkers = 4
defaultCommLookupQueueSize = 512
+ defaultMaxPendingEnterEvs = 16384
+ defaultMaxProcFdCacheSize = 8192
+ cacheTrimDivisor = 4
)
type eventLoopConfig struct {
@@ -241,18 +245,23 @@ type rawEventHandler func(raw []byte, ch chan<- *event.Pair)
type tracepointExitHandler func(ep *event.Pair) bool
type eventLoop struct {
- filter globalfilter.Filter
- enterEvs map[uint32]*event.Pair // Temp. store of sys_enter tracepoints per Tid.
- pendingHandles map[uint32]string // map of TID to pathname from name_to_handle_at
- fdTracker *fdTracker
- procFdCache map[uint64]*file.FdFile // Cache procfs-resolved metadata for unknown fds.
- commResolver *commResolver
- prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events)
- rawHandlers map[types.EventType]rawEventHandler
- exitHandlers map[reflect.Type]tracepointExitHandler
- printCb func(ep *event.Pair) // Callback to print the event
- warningCb func(message string) // Optional callback for non-fatal event processing warnings
- cfg eventLoopConfig
+ filter globalfilter.Filter
+ enterEvs map[uint32]*event.Pair // Temp. store of sys_enter tracepoints per Tid.
+ enterEvAges map[uint32]uint64
+ pendingHandles map[uint32]string // map of TID to pathname from name_to_handle_at
+ fdTracker *fdTracker
+ procFdCache map[uint64]*file.FdFile // Cache procfs-resolved metadata for unknown fds.
+ procFdCacheAges map[uint64]uint64
+ commResolver *commResolver
+ prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events)
+ rawHandlers map[types.EventType]rawEventHandler
+ exitHandlers map[reflect.Type]tracepointExitHandler
+ printCb func(ep *event.Pair) // Callback to print the event
+ warningCb func(message string) // Optional callback for non-fatal event processing warnings
+ cfg eventLoopConfig
+ cacheAge uint64
+ maxPendingEnterEvs int
+ maxProcFdCacheSize int
// Statistics
numTracepoints uint
@@ -271,18 +280,20 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) {
}
el := &eventLoop{
- filter: cfg.filter.Clone(),
- enterEvs: make(map[uint32]*event.Pair),
- pendingHandles: make(map[uint32]string),
- fdTracker: fdState,
- procFdCache: make(map[uint64]*file.FdFile),
- commResolver: commState,
- prevPairTimes: make(map[uint32]uint64),
- rawHandlers: make(map[types.EventType]rawEventHandler),
- exitHandlers: make(map[reflect.Type]tracepointExitHandler),
- printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() },
- cfg: cfg,
- done: make(chan struct{}),
+ filter: cfg.filter.Clone(),
+ enterEvs: make(map[uint32]*event.Pair),
+ enterEvAges: make(map[uint32]uint64),
+ pendingHandles: make(map[uint32]string),
+ fdTracker: fdState,
+ procFdCache: make(map[uint64]*file.FdFile),
+ procFdCacheAges: make(map[uint64]uint64),
+ commResolver: commState,
+ prevPairTimes: make(map[uint32]uint64),
+ rawHandlers: make(map[types.EventType]rawEventHandler),
+ exitHandlers: make(map[reflect.Type]tracepointExitHandler),
+ printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() },
+ cfg: cfg,
+ done: make(chan struct{}),
}
el.initRawHandlers()
el.initExitHandlers()
@@ -582,17 +593,17 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) {
// Schedule comm lookup as early as possible to reduce races for short-lived processes.
e.queueCommLookup(tid)
if !e.filter.UsesCommFilter() {
- e.enterEvs[tid] = event.NewPair(enterEv)
+ e.setEnterEvent(enterEv)
return
}
switch enterEv.(type) {
case *types.OpenEvent:
- e.enterEvs[tid] = event.NewPair(enterEv)
+ e.setEnterEvent(enterEv)
default:
// Only, when we have a comm name
if _, ok := e.cachedComm(tid); ok {
- e.enterEvs[tid] = event.NewPair(enterEv)
+ e.setEnterEvent(enterEv)
} else {
e.notifyWarning(fmt.Sprintf("No comm name for %v process probably already vanished?", enterEv))
}
@@ -600,12 +611,11 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) {
}
func (e *eventLoop) tracepointExited(exitEv event.Event, ch chan<- *event.Pair) {
- ep, ok := e.enterEvs[exitEv.GetTid()]
+ ep, ok := e.consumeEnterEvent(exitEv.GetTid())
if !ok {
exitEv.Recycle()
return
}
- delete(e.enterEvs, exitEv.GetTid())
ep.ExitEv = exitEv
e.numSyscalls++
@@ -1006,16 +1016,23 @@ func (e *eventLoop) resolveFdFile(fd int32, pid uint32) file.File {
}
func (e *eventLoop) cachedProcFdFile(fd int32, pid uint32) (*file.FdFile, bool) {
- cache, ok := e.procFdCacheState()[procFdCacheKey(pid, fd)]
+ key := procFdCacheKey(pid, fd)
+ cache, ok := e.procFdCacheState()[key]
+ if ok {
+ e.procFdCacheAgeState()[key] = e.nextCacheAge()
+ }
return cache, ok
}
func (e *eventLoop) setProcFdCache(fd int32, pid uint32, resolved *file.FdFile) {
- e.procFdCacheState()[procFdCacheKey(pid, fd)] = resolved
+ key := procFdCacheKey(pid, fd)
+ e.procFdCacheState()[key] = resolved
+ e.procFdCacheAgeState()[key] = e.nextCacheAge()
+ e.pruneProcFdCache()
}
func (e *eventLoop) deleteProcFdCache(fd int32, pid uint32) {
- delete(e.procFdCacheState(), procFdCacheKey(pid, fd))
+ e.deleteProcFdCacheKey(procFdCacheKey(pid, fd))
}
func (e *eventLoop) deleteProcFdCacheFrom(first int32, pid uint32) {
@@ -1024,7 +1041,7 @@ func (e *eventLoop) deleteProcFdCacheFrom(first int32, pid uint32) {
cachePid := uint32(key >> 32)
cacheFd := int32(uint32(key))
if cachePid == pid && cacheFd >= first {
- delete(cache, key)
+ e.deleteProcFdCacheKey(key)
}
}
}
@@ -1036,6 +1053,143 @@ func (e *eventLoop) procFdCacheState() map[uint64]*file.FdFile {
return e.procFdCache
}
+func (e *eventLoop) procFdCacheAgeState() map[uint64]uint64 {
+ if e.procFdCacheAges == nil {
+ e.procFdCacheAges = make(map[uint64]uint64)
+ }
+ return e.procFdCacheAges
+}
+
+func (e *eventLoop) enterEventAgeState() map[uint32]uint64 {
+ if e.enterEvAges == nil {
+ e.enterEvAges = make(map[uint32]uint64)
+ }
+ return e.enterEvAges
+}
+
+func (e *eventLoop) enterEventState() map[uint32]*event.Pair {
+ if e.enterEvs == nil {
+ e.enterEvs = make(map[uint32]*event.Pair)
+ }
+ return e.enterEvs
+}
+
+func (e *eventLoop) setEnterEvent(enterEv event.Event) {
+ tid := enterEv.GetTid()
+ pair := event.NewPair(enterEv)
+ if prev, ok := e.enterEventState()[tid]; ok && prev != nil {
+ prev.Recycle()
+ }
+ e.enterEventState()[tid] = pair
+ e.enterEventAgeState()[tid] = e.nextCacheAge()
+ e.prunePendingEnterEvents()
+}
+
+func (e *eventLoop) consumeEnterEvent(tid uint32) (*event.Pair, bool) {
+ pair, ok := e.enterEventState()[tid]
+ if !ok {
+ return nil, false
+ }
+ delete(e.enterEventState(), tid)
+ delete(e.enterEventAgeState(), tid)
+ return pair, true
+}
+
+func (e *eventLoop) prunePendingEnterEvents() {
+ state := e.enterEventState()
+ limit := e.pendingEnterLimit()
+ if len(state) <= limit {
+ return
+ }
+ trimOldestPendingPairs(state, e.enterEventAgeState(), trimTarget(limit))
+}
+
+func trimOldestPendingPairs(state map[uint32]*event.Pair, ages map[uint32]uint64, targetSize int) {
+ excess := len(state) - targetSize
+ if excess <= 0 {
+ return
+ }
+ type pendingPairAge struct {
+ tid uint32
+ age uint64
+ }
+ oldest := make([]pendingPairAge, 0, len(state))
+ for tid := range state {
+ age := ages[tid]
+ oldest = append(oldest, pendingPairAge{tid: tid, age: age})
+ }
+ sort.Slice(oldest, func(i, j int) bool { return oldest[i].age < oldest[j].age })
+ for _, entry := range oldest[:excess] {
+ if pair, ok := state[entry.tid]; ok && pair != nil {
+ pair.Recycle()
+ }
+ delete(state, entry.tid)
+ delete(ages, entry.tid)
+ }
+}
+
+func (e *eventLoop) pruneProcFdCache() {
+ state := e.procFdCacheState()
+ limit := e.procFdCacheLimit()
+ if len(state) <= limit {
+ return
+ }
+ trimOldestProcFdEntries(state, e.procFdCacheAgeState(), trimTarget(limit))
+}
+
+func trimOldestProcFdEntries(state map[uint64]*file.FdFile, ages map[uint64]uint64, targetSize int) {
+ excess := len(state) - targetSize
+ if excess <= 0 {
+ return
+ }
+ type procFdAge struct {
+ key uint64
+ age uint64
+ }
+ oldest := make([]procFdAge, 0, len(state))
+ for key := range state {
+ age := ages[key]
+ oldest = append(oldest, procFdAge{key: key, age: age})
+ }
+ sort.Slice(oldest, func(i, j int) bool { return oldest[i].age < oldest[j].age })
+ for _, entry := range oldest[:excess] {
+ delete(state, entry.key)
+ delete(ages, entry.key)
+ }
+}
+
+func (e *eventLoop) deleteProcFdCacheKey(key uint64) {
+ delete(e.procFdCacheState(), key)
+ delete(e.procFdCacheAgeState(), key)
+}
+
+func (e *eventLoop) nextCacheAge() uint64 {
+ e.cacheAge++
+ return e.cacheAge
+}
+
+func (e *eventLoop) pendingEnterLimit() int {
+ if e.maxPendingEnterEvs > 0 {
+ return e.maxPendingEnterEvs
+ }
+ return defaultMaxPendingEnterEvs
+}
+
+func (e *eventLoop) procFdCacheLimit() int {
+ if e.maxProcFdCacheSize > 0 {
+ return e.maxProcFdCacheSize
+ }
+ return defaultMaxProcFdCacheSize
+}
+
+func trimTarget(limit int) int {
+ target := limit - limit/cacheTrimDivisor
+ if target < 1 {
+ return 1
+ }
+ return target
+}
+
func procFdCacheKey(pid uint32, fd int32) uint64 {
return uint64(pid)<<32 | uint64(uint32(fd))
}
diff --git a/internal/eventloop_cleanup_test.go b/internal/eventloop_cleanup_test.go
new file mode 100644
index 0000000..f76bfe7
--- /dev/null
+++ b/internal/eventloop_cleanup_test.go
@@ -0,0 +1,88 @@
+package internal
+
+import (
+ "testing"
+
+ "ior/internal/file"
+)
+
+func TestTracepointEnteredPrunesOldestPendingPairs(t *testing.T) {
+ el := &eventLoop{
+ commResolver: newCommResolver(make(map[uint32]string)),
+ maxPendingEnterEvs: 2,
+ }
+
+ enterOne, _ := makeEnterOpenEvent(t, defaulTime, defaultPid, defaultTid)
+ enterTwo, _ := makeEnterOpenEvent(t, defaulTime+1, defaultPid, defaultTid+1)
+ enterThree, _ := makeEnterOpenEvent(t, defaulTime+2, defaultPid, defaultTid+2)
+
+ el.tracepointEntered(&enterOne)
+ el.tracepointEntered(&enterTwo)
+ el.tracepointEntered(&enterThree)
+
+ if _, ok := el.enterEvs[defaultTid]; ok {
+ t.Fatalf("expected oldest pending enter event to be evicted")
+ }
+ if _, ok := el.enterEvs[defaultTid+1]; !ok {
+ t.Fatalf("expected newer pending enter event to be retained")
+ }
+ if _, ok := el.enterEvs[defaultTid+2]; !ok {
+ t.Fatalf("expected newest pending enter event to be retained")
+ }
+ if got := len(el.enterEvAges); got != 2 {
+ t.Fatalf("pending enter metadata size = %d, want 2", got)
+ }
+
+ for _, pair := range el.enterEvs {
+ pair.Recycle()
+ }
+}
+
+func TestConsumeEnterEventClearsPendingPairMetadata(t *testing.T) {
+ el := &eventLoop{}
+
+ enterOne, _ := makeEnterOpenEvent(t, defaulTime, defaultPid, defaultTid)
+ el.setEnterEvent(&enterOne)
+
+ pair, ok := el.consumeEnterEvent(defaultTid)
+ if !ok {
+ t.Fatalf("expected pending enter event to be consumed")
+ }
+ if pair == nil {
+ t.Fatalf("expected consumed pair")
+ }
+ pair.Recycle()
+
+ if _, ok := el.enterEvs[defaultTid]; ok {
+ t.Fatalf("expected pending enter pair to be removed")
+ }
+ if _, ok := el.enterEvAges[defaultTid]; ok {
+ t.Fatalf("expected pending enter metadata to be removed")
+ }
+}
+
+func TestProcFdCacheRetainsRecentlyUsedEntries(t *testing.T) {
+ el := &eventLoop{maxProcFdCacheSize: 2}
+
+ el.setProcFdCache(10, defaultPid, file.NewFdWithPid(10, defaultPid))
+ el.setProcFdCache(11, defaultPid, file.NewFdWithPid(11, defaultPid))
+
+ if _, ok := el.cachedProcFdFile(10, defaultPid); !ok {
+ t.Fatalf("expected first cache entry to exist before refresh")
+ }
+
+ el.setProcFdCache(12, defaultPid, file.NewFdWithPid(12, defaultPid))
+
+ if _, ok := el.cachedProcFdFile(10, defaultPid); !ok {
+ t.Fatalf("expected recently used cache entry to be retained")
+ }
+ if _, ok := el.cachedProcFdFile(11, defaultPid); ok {
+ t.Fatalf("expected least recently used cache entry to be evicted")
+ }
+ if _, ok := el.cachedProcFdFile(12, defaultPid); !ok {
+ t.Fatalf("expected newest cache entry to be retained")
+ }
+ if got := len(el.procFdCacheAges); got != 2 {
+ t.Fatalf("proc fd cache metadata size = %d, want 2", got)
+ }
+}