diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-08 08:55:06 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-08 08:55:06 +0200 |
| commit | 9950c77981ce06be34e877a6729abb23a36789c6 (patch) | |
| tree | 778c4f32c27740f86810cb052146df97b20cea9e | |
| parent | dc20240d2eddacba8a690a75547cbd8f1d3df98e (diff) | |
task(ior): remove eventloop busy-wait polling (task 2b8f8f83)
| -rw-r--r-- | internal/eventloop.go | 6 | ||||
| -rw-r--r-- | internal/eventloop_events_test.go | 51 | ||||
| -rw-r--r-- | internal/eventloop_test.go | 5 |
3 files changed, 58 insertions, 4 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go index 479ac59..64c7d6f 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "reflect" + "runtime" "sync" "syscall" "time" @@ -407,11 +408,12 @@ func (e *eventLoop) events(ctx context.Context, rawCh <-chan []byte) <-chan *eve continue } e.processRawEvent(raw, ch) + // Yield so downstream consumers can process emitted pairs before + // the next raw event mutates shared tracker state. + runtime.Gosched() case <-ctx.Done(): fmt.Println("Stopping event loop") return - default: - time.Sleep(time.Millisecond * 10) } } }() diff --git a/internal/eventloop_events_test.go b/internal/eventloop_events_test.go new file mode 100644 index 0000000..4515164 --- /dev/null +++ b/internal/eventloop_events_test.go @@ -0,0 +1,51 @@ +package internal + +import ( + "context" + "testing" + "time" +) + +func TestEventsStopsOnContextCancelWithoutRawData(t *testing.T) { + el := mustNewEventLoop(t, eventLoopConfig{}) + rawCh := make(chan []byte) + ctx, cancel := context.WithCancel(context.Background()) + out := el.events(ctx, rawCh) + + cancel() + + select { + case _, ok := <-out: + if ok { + t.Fatal("expected output channel to be closed after cancellation") + } + case <-time.After(200 * time.Millisecond): + t.Fatal("timed out waiting for output channel to close after cancellation") + } +} + +func TestEventsIgnoresEmptyRawPayload(t *testing.T) { + el := mustNewEventLoop(t, eventLoopConfig{}) + rawCh := make(chan []byte, 1) + ctx, cancel := context.WithCancel(context.Background()) + out := el.events(ctx, rawCh) + + rawCh <- nil + + select { + case ep := <-out: + t.Fatalf("expected no event for empty raw payload, got %#v", ep) + case <-time.After(50 * time.Millisecond): + } + + cancel() + + select { + case _, ok := <-out: + if ok { + t.Fatal("expected output channel to be closed after cancellation") + } + case <-time.After(200 * time.Millisecond): + t.Fatal("timed out waiting for output channel to close after cancellation") + } +} diff --git a/internal/eventloop_test.go b/internal/eventloop_test.go index 7fcd438..32dddd4 100644 --- a/internal/eventloop_test.go +++ b/internal/eventloop_test.go @@ -104,8 +104,9 @@ func TestEventloop(t *testing.T) { for _, raw := range td.rawTracepoints { t.Log("Sending raw tracepoint", raw, "simulating BPF sending this") inCh <- raw - // Small delay to simulate real BPF event timing - time.Sleep(time.Microsecond) + // Keep synthetic feed pace close to real arrival and avoid + // stateful assertion races between adjacent events. + time.Sleep(100 * time.Microsecond) } }() for _, validate := range td.validates { |
