summaryrefslogtreecommitdiff
path: root/internal/eventloop.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/eventloop.go')
-rw-r--r--internal/eventloop.go44
1 files changed, 38 insertions, 6 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go
index 2def4c0..479ac59 100644
--- a/internal/eventloop.go
+++ b/internal/eventloop.go
@@ -73,11 +73,14 @@ type commResolver struct {
mu sync.RWMutex
pending map[uint32]struct{}
+ closed bool
lookupQueue chan uint32
lookupWorkers int
resolveFn func(uint32) string
startWorkersOnce sync.Once
+ workersWG sync.WaitGroup
+ shutdownOnce sync.Once
}
func newCommResolver(comms map[uint32]string) *commResolver {
@@ -106,14 +109,22 @@ func (r *commResolver) ensureLookupConfig() {
func (r *commResolver) startLookupWorkers() {
r.ensureLookupConfig()
+ r.mu.RLock()
+ closed := r.closed
+ r.mu.RUnlock()
+ if closed {
+ return
+ }
r.startWorkersOnce.Do(func() {
for i := 0; i < r.lookupWorkers; i++ {
+ r.workersWG.Add(1)
go r.lookupWorker()
}
})
}
func (r *commResolver) lookupWorker() {
+ defer r.workersWG.Done()
for tid := range r.lookupQueue {
comm := r.resolveFn(tid)
r.mu.Lock()
@@ -176,31 +187,51 @@ func (r *commResolver) queueLookup(tid uint32) {
if tid == 0 {
return
}
+ r.startLookupWorkers()
+
r.mu.Lock()
+ defer r.mu.Unlock()
+ if r.closed {
+ return
+ }
if _, ok := r.comms[tid]; ok {
- r.mu.Unlock()
return
}
if r.pending == nil {
r.pending = make(map[uint32]struct{})
}
if _, ok := r.pending[tid]; ok {
- r.mu.Unlock()
return
}
r.pending[tid] = struct{}{}
- r.mu.Unlock()
-
- r.startLookupWorkers()
// Keep event processing non-blocking if resolver workers are saturated.
select {
case r.lookupQueue <- tid:
default:
- r.mu.Lock()
delete(r.pending, tid)
+ }
+}
+
+func (r *commResolver) shutdown() {
+ r.shutdownOnce.Do(func() {
+ r.ensureLookupConfig()
+ r.mu.Lock()
+ r.closed = true
+ for tid := range r.pending {
+ delete(r.pending, tid)
+ }
+ queue := r.lookupQueue
r.mu.Unlock()
+ close(queue)
+ r.workersWG.Wait()
+ })
+}
+func (e *eventLoop) shutdownCommResolver() {
+ if e.commResolver == nil {
+ return
}
+ e.commResolver.shutdown()
}
type rawEventHandler func(raw []byte, ch chan<- *event.Pair)
@@ -341,6 +372,7 @@ func (e *eventLoop) stats() string {
func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) {
defer close(e.done)
+ defer e.shutdownCommResolver()
if e.cfg.pprofEnable {
fmt.Println("Profiling, press Ctrl+C to stop")