diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 22:05:25 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 22:05:25 +0200 |
| commit | 81fdd28081922aaeb355a1f87cebaf85f93622c5 (patch) | |
| tree | efb747ac5995a4266f5e46cebdf3fef6c5ca8a26 /internal/eventloop_comm.go | |
| parent | 0d4354bab36c95cd4e8125d2d7b5b66de4ae5d11 (diff) | |
Refactor event loop into focused units (task 389)
Diffstat (limited to 'internal/eventloop_comm.go')
| -rw-r--r-- | internal/eventloop_comm.go | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/internal/eventloop_comm.go b/internal/eventloop_comm.go new file mode 100644 index 0000000..6b77023 --- /dev/null +++ b/internal/eventloop_comm.go @@ -0,0 +1,213 @@ +package internal + +import ( + "os" + "path/filepath" + "strconv" + "sync" +) + +type commResolver struct { + comms map[uint32]string + + mu sync.RWMutex + pending map[uint32]struct{} + closed bool + + lookupQueue chan uint32 + lookupWorkers int + resolveFn func(uint32) string + startWorkersOnce sync.Once + workersWG sync.WaitGroup + shutdownOnce sync.Once +} + +func newCommResolver(comms map[uint32]string) *commResolver { + if comms == nil { + comms = make(map[uint32]string) + } + r := &commResolver{ + comms: comms, + pending: make(map[uint32]struct{}), + } + r.ensureLookupConfig() + return r +} + +func (r *commResolver) ensureLookupConfig() { + if r.lookupWorkers <= 0 { + r.lookupWorkers = defaultCommLookupWorkers + } + if r.lookupQueue == nil { + r.lookupQueue = make(chan uint32, defaultCommLookupQueueSize) + } + if r.resolveFn == nil { + r.resolveFn = resolveCommFromProc + } +} + +func (r *commResolver) startLookupWorkers() { + r.ensureLookupConfig() + r.mu.RLock() + closed := r.closed + r.mu.RUnlock() + if closed { + return + } + r.startWorkersOnce.Do(func() { + for i := 0; i < r.lookupWorkers; i++ { + r.workersWG.Add(1) + go r.lookupWorker() + } + }) +} + +func (r *commResolver) lookupWorker() { + defer r.workersWG.Done() + for tid := range r.lookupQueue { + comm := r.resolveFn(tid) + r.mu.Lock() + delete(r.pending, tid) + if comm != "" { + r.comms[tid] = comm + } + r.mu.Unlock() + } +} + +func (r *commResolver) seedTrackedPidComm(pidFilter int) { + candidates := []uint32{uint32(os.Getpid()), uint32(os.Getppid())} + if pidFilter > 0 { + candidates = append(candidates, uint32(pidFilter)) + } + + seen := make(map[uint32]struct{}, len(candidates)) + for _, tid := range candidates { + if tid == 0 { + continue + } + if _, ok := seen[tid]; ok { + continue + } + seen[tid] = struct{}{} + if comm := resolveCommFromProc(tid); comm != "" { + r.setCached(tid, comm) + continue + } + r.queueLookup(tid) + } +} + +func (r *commResolver) comm(tid uint32) string { + if comm, ok := r.cached(tid); ok { + return comm + } + r.queueLookup(tid) + return "" +} + +func (r *commResolver) cached(tid uint32) (string, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + comm, ok := r.comms[tid] + return comm, ok +} + +func (r *commResolver) setCached(tid uint32, comm string) { + if comm == "" { + return + } + r.mu.Lock() + r.comms[tid] = comm + r.mu.Unlock() +} + +func (r *commResolver) queueLookup(tid uint32) { + if tid == 0 { + return + } + r.startLookupWorkers() + + r.mu.Lock() + defer r.mu.Unlock() + if r.closed { + return + } + if _, ok := r.comms[tid]; ok { + return + } + if r.pending == nil { + r.pending = make(map[uint32]struct{}) + } + if _, ok := r.pending[tid]; ok { + return + } + r.pending[tid] = struct{}{} + + // Keep event processing non-blocking if resolver workers are saturated. + select { + case r.lookupQueue <- tid: + default: + delete(r.pending, tid) + } +} + +func (r *commResolver) shutdown() { + r.shutdownOnce.Do(func() { + r.ensureLookupConfig() + r.mu.Lock() + r.closed = true + for tid := range r.pending { + delete(r.pending, tid) + } + queue := r.lookupQueue + r.mu.Unlock() + close(queue) + r.workersWG.Wait() + }) +} + +func (e *eventLoop) shutdownCommResolver() { + if e.commResolver == nil { + return + } + e.commResolver.shutdown() +} + +func (e *eventLoop) comm(tid uint32) string { + return e.commState().comm(tid) +} + +func (e *eventLoop) cachedComm(tid uint32) (string, bool) { + return e.commState().cached(tid) +} + +func (e *eventLoop) setCachedComm(tid uint32, comm string) { + e.commState().setCached(tid, comm) +} + +func (e *eventLoop) queueCommLookup(tid uint32) { + e.commState().queueLookup(tid) +} + +func procTidPathPrefix(tid uint32) string { + return "/proc/" + strconv.FormatUint(uint64(tid), 10) +} + +func resolveCommFromProc(tid uint32) string { + procPath := procTidPathPrefix(tid) + if data, err := os.ReadFile(procPath + "/comm"); err == nil { + comm := string(data) + if len(comm) > 0 && comm[len(comm)-1] == '\n' { + comm = comm[:len(comm)-1] + } + if comm != "" { + return comm + } + } + if linkName, err := os.Readlink(procPath + "/exe"); err == nil { + linkName = filepath.Base(linkName) + return linkName + } + return "" +} |
