diff options
Diffstat (limited to 'internal/eventloop.go')
| -rw-r--r-- | internal/eventloop.go | 44 |
1 files changed, 38 insertions, 6 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go index 2def4c0..479ac59 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -73,11 +73,14 @@ type commResolver struct { mu sync.RWMutex pending map[uint32]struct{} + closed bool lookupQueue chan uint32 lookupWorkers int resolveFn func(uint32) string startWorkersOnce sync.Once + workersWG sync.WaitGroup + shutdownOnce sync.Once } func newCommResolver(comms map[uint32]string) *commResolver { @@ -106,14 +109,22 @@ func (r *commResolver) ensureLookupConfig() { func (r *commResolver) startLookupWorkers() { r.ensureLookupConfig() + r.mu.RLock() + closed := r.closed + r.mu.RUnlock() + if closed { + return + } r.startWorkersOnce.Do(func() { for i := 0; i < r.lookupWorkers; i++ { + r.workersWG.Add(1) go r.lookupWorker() } }) } func (r *commResolver) lookupWorker() { + defer r.workersWG.Done() for tid := range r.lookupQueue { comm := r.resolveFn(tid) r.mu.Lock() @@ -176,31 +187,51 @@ func (r *commResolver) queueLookup(tid uint32) { if tid == 0 { return } + r.startLookupWorkers() + r.mu.Lock() + defer r.mu.Unlock() + if r.closed { + return + } if _, ok := r.comms[tid]; ok { - r.mu.Unlock() return } if r.pending == nil { r.pending = make(map[uint32]struct{}) } if _, ok := r.pending[tid]; ok { - r.mu.Unlock() return } r.pending[tid] = struct{}{} - r.mu.Unlock() - - r.startLookupWorkers() // Keep event processing non-blocking if resolver workers are saturated. select { case r.lookupQueue <- tid: default: - r.mu.Lock() delete(r.pending, tid) + } +} + +func (r *commResolver) shutdown() { + r.shutdownOnce.Do(func() { + r.ensureLookupConfig() + r.mu.Lock() + r.closed = true + for tid := range r.pending { + delete(r.pending, tid) + } + queue := r.lookupQueue r.mu.Unlock() + close(queue) + r.workersWG.Wait() + }) +} +func (e *eventLoop) shutdownCommResolver() { + if e.commResolver == nil { + return } + e.commResolver.shutdown() } type rawEventHandler func(raw []byte, ch chan<- *event.Pair) @@ -341,6 +372,7 @@ func (e *eventLoop) stats() string { func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { defer close(e.done) + defer e.shutdownCommResolver() if e.cfg.pprofEnable { fmt.Println("Profiling, press Ctrl+C to stop") |
