diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-12 23:33:15 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-12 23:33:15 +0200 |
| commit | 775d3e59c7a6c060d0a9ecf3536c0df383d241be (patch) | |
| tree | 981a6550849039ef4c443dce38a05829b79a11df | |
| parent | 2efe5330cb43f685f3159a28ce211392a0bbe3c3 (diff) | |
refactor: extract shared syscall stream row model
| -rw-r--r-- | internal/ior.go | 5 | ||||
| -rw-r--r-- | internal/streamrow/row.go | 137 | ||||
| -rw-r--r-- | internal/streamrow/row_test.go | 48 | ||||
| -rw-r--r-- | internal/tui/eventstream/streamevent.go | 107 | ||||
| -rw-r--r-- | internal/tui/eventstream/streamevent_test.go | 6 |
5 files changed, 201 insertions, 102 deletions
diff --git a/internal/ior.go b/internal/ior.go index 8ad82c0..ea44fea 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -212,6 +212,7 @@ func tuiTraceStarterFromRunTrace( bindings.SetLiveTrie(liveTrie) } streamEvents := make(chan eventstream.StreamEvent, appconfig.DefaultChannelBufferSize) + streamSeq := eventstream.NewSequencer(0) go func() { for ev := range streamEvents { @@ -230,7 +231,7 @@ func tuiTraceStarterFromRunTrace( return } engine.Ingest(ep) - streamEvents <- eventstream.NewStreamEvent(ep.EnterEv.GetTime(), ep) + streamEvents <- eventstream.NewStreamEvent(streamSeq.Next(), ep) liveTrie.Ingest(ep) // Both downstream consumers snapshot the pair synchronously, so // the pooled pair can be recycled immediately afterwards. @@ -239,7 +240,7 @@ func tuiTraceStarterFromRunTrace( el.warningCb = func(message string) { // Drop warning notifications if the stream channel is saturated. select { - case streamEvents <- eventstream.NewWarningEvent(message): + case streamEvents <- eventstream.NewWarningEvent(streamSeq.Next(), message): default: } } diff --git a/internal/streamrow/row.go b/internal/streamrow/row.go new file mode 100644 index 0000000..457fcc0 --- /dev/null +++ b/internal/streamrow/row.go @@ -0,0 +1,137 @@ +package streamrow + +import ( + "sync/atomic" + "time" + + "ior/internal/event" + "ior/internal/types" +) + +// Row is the shared syscall stream row model used by live TUI views, +// snapshot export, and future recording outputs. +type Row struct { + Seq uint64 + TimeNs uint64 + Syscall string + Comm string + PID uint32 + TID uint32 + FileName string + DurationNs uint64 + GapNs uint64 + Bytes uint64 + RetVal int64 + IsError bool + FD int32 +} + +func (r Row) SyscallValue() string { + return r.Syscall +} + +func (r Row) CommValue() string { + return r.Comm +} + +func (r Row) FileValue() string { + return r.FileName +} + +func (r Row) PIDValue() uint32 { + return r.PID +} + +func (r Row) TIDValue() uint32 { + return r.TID +} + +func (r Row) FDValue() int32 { + return r.FD +} + +func (r Row) LatencyValue() uint64 { + return r.DurationNs +} + +func (r Row) GapValue() uint64 { + return r.GapNs +} + +func (r Row) BytesValue() uint64 { + return r.Bytes +} + +func (r Row) ReturnValue() int64 { + return r.RetVal +} + +func (r Row) ErrorValue() bool { + return r.IsError +} + +// UnknownFD marks events that are not associated with a file descriptor. +const UnknownFD int32 = -1 + +// Sequencer hands out strictly increasing row sequence numbers. +type Sequencer struct { + next atomic.Uint64 +} + +// NewSequencer constructs a monotonic sequence generator. The first call to +// Next returns start+1. +func NewSequencer(start uint64) *Sequencer { + s := &Sequencer{} + s.next.Store(start) + return s +} + +// Next returns the next sequence number. +func (s *Sequencer) Next() uint64 { + if s == nil { + return 0 + } + return s.next.Add(1) +} + +// New converts one syscall pair into the shared row model. +func New(seq uint64, pair *event.Pair) Row { + row := Row{ + Seq: seq, + TimeNs: pair.EnterEv.GetTime(), + Syscall: pair.EnterEv.GetTraceId().Name(), + Comm: pair.Comm, + PID: pair.EnterEv.GetPid(), + TID: pair.EnterEv.GetTid(), + FileName: pair.FileName(), + DurationNs: pair.Duration, + GapNs: pair.DurationToPrev, + Bytes: pair.Bytes, + FD: UnknownFD, + } + if fd, ok := pair.FileDescriptor(); ok { + row.FD = fd + } + + if retEv, ok := pair.ExitEv.(*types.RetEvent); ok { + row.RetVal = retEv.Ret + row.IsError = retEv.Ret < 0 + } + + return row +} + +// NewWarning creates a synthetic row for non-fatal runtime warnings. +func NewWarning(seq uint64, message string) Row { + now := uint64(time.Now().UnixNano()) + return Row{ + Seq: seq, + TimeNs: now, + Syscall: "warning", + Comm: "ior", + FileName: message, + FD: UnknownFD, + RetVal: -1, + IsError: true, + } +} diff --git a/internal/streamrow/row_test.go b/internal/streamrow/row_test.go new file mode 100644 index 0000000..729ba94 --- /dev/null +++ b/internal/streamrow/row_test.go @@ -0,0 +1,48 @@ +package streamrow + +import ( + "sync" + "testing" +) + +func TestSequencerStartsAfterSeed(t *testing.T) { + seq := NewSequencer(41) + if got, want := seq.Next(), uint64(42); got != want { + t.Fatalf("first Next() = %d, want %d", got, want) + } + if got, want := seq.Next(), uint64(43); got != want { + t.Fatalf("second Next() = %d, want %d", got, want) + } +} + +func TestSequencerIsMonotonicUnderConcurrency(t *testing.T) { + seq := NewSequencer(0) + + const workers = 8 + const perWorker = 64 + + got := make(chan uint64, workers*perWorker) + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < perWorker; j++ { + got <- seq.Next() + } + }() + } + wg.Wait() + close(got) + + seen := make(map[uint64]struct{}, workers*perWorker) + for n := range got { + if _, ok := seen[n]; ok { + t.Fatalf("duplicate sequence number %d", n) + } + seen[n] = struct{}{} + } + if got, want := len(seen), workers*perWorker; got != want { + t.Fatalf("unique sequence count = %d, want %d", got, want) + } +} diff --git a/internal/tui/eventstream/streamevent.go b/internal/tui/eventstream/streamevent.go index 9238a84..241b984 100644 --- a/internal/tui/eventstream/streamevent.go +++ b/internal/tui/eventstream/streamevent.go @@ -1,112 +1,25 @@ package eventstream import ( - "time" - "ior/internal/event" - "ior/internal/types" + "ior/internal/streamrow" ) -type StreamEvent struct { - Seq uint64 - TimeNs uint64 - Syscall string - Comm string - PID uint32 - TID uint32 - FileName string - DurationNs uint64 - GapNs uint64 - Bytes uint64 - RetVal int64 - IsError bool - FD int32 -} - -func (e StreamEvent) SyscallValue() string { - return e.Syscall -} - -func (e StreamEvent) CommValue() string { - return e.Comm -} - -func (e StreamEvent) FileValue() string { - return e.FileName -} - -func (e StreamEvent) PIDValue() uint32 { - return e.PID -} - -func (e StreamEvent) TIDValue() uint32 { - return e.TID -} - -func (e StreamEvent) FDValue() int32 { - return e.FD -} - -func (e StreamEvent) LatencyValue() uint64 { - return e.DurationNs -} - -func (e StreamEvent) GapValue() uint64 { - return e.GapNs -} - -func (e StreamEvent) BytesValue() uint64 { - return e.Bytes -} +type StreamEvent = streamrow.Row +type Sequencer = streamrow.Sequencer -func (e StreamEvent) ReturnValue() int64 { - return e.RetVal -} +// UnknownFD marks events that are not associated with a file descriptor. +const UnknownFD = streamrow.UnknownFD -func (e StreamEvent) ErrorValue() bool { - return e.IsError +func NewSequencer(start uint64) *Sequencer { + return streamrow.NewSequencer(start) } -// UnknownFD marks events that are not associated with a file descriptor. -const UnknownFD int32 = -1 - func NewStreamEvent(seq uint64, pair *event.Pair) StreamEvent { - e := StreamEvent{ - Seq: seq, - TimeNs: pair.EnterEv.GetTime(), - Syscall: pair.EnterEv.GetTraceId().Name(), - Comm: pair.Comm, - PID: pair.EnterEv.GetPid(), - TID: pair.EnterEv.GetTid(), - FileName: pair.FileName(), - DurationNs: pair.Duration, - GapNs: pair.DurationToPrev, - Bytes: pair.Bytes, - FD: UnknownFD, - } - if fd, ok := pair.FileDescriptor(); ok { - e.FD = fd - } - - if retEv, ok := pair.ExitEv.(*types.RetEvent); ok { - e.RetVal = retEv.Ret - e.IsError = retEv.Ret < 0 - } - - return e + return streamrow.New(seq, pair) } // NewWarningEvent creates a synthetic stream row for non-fatal runtime warnings. -func NewWarningEvent(message string) StreamEvent { - now := uint64(time.Now().UnixNano()) - return StreamEvent{ - Seq: now, - TimeNs: now, - Syscall: "warning", - Comm: "ior", - FileName: message, - FD: UnknownFD, - RetVal: -1, - IsError: true, - } +func NewWarningEvent(seq uint64, message string) StreamEvent { + return streamrow.NewWarning(seq, message) } diff --git a/internal/tui/eventstream/streamevent_test.go b/internal/tui/eventstream/streamevent_test.go index dd65dd1..a1df8a1 100644 --- a/internal/tui/eventstream/streamevent_test.go +++ b/internal/tui/eventstream/streamevent_test.go @@ -97,7 +97,7 @@ func TestNewStreamEventWithoutRetEvent(t *testing.T) { } func TestNewWarningEventPopulatesFields(t *testing.T) { - got := NewWarningEvent("Dropped malformed event") + got := NewWarningEvent(7, "Dropped malformed event") if got.Syscall != "warning" { t.Fatalf("Syscall = %q, want warning", got.Syscall) @@ -117,7 +117,7 @@ func TestNewWarningEventPopulatesFields(t *testing.T) { if !got.IsError { t.Fatalf("IsError = false, want true") } - if got.Seq == 0 || got.TimeNs == 0 { - t.Fatalf("Seq/TimeNs = %d/%d, want non-zero", got.Seq, got.TimeNs) + if got.Seq != 7 || got.TimeNs == 0 { + t.Fatalf("Seq/TimeNs = %d/%d, want 7/non-zero", got.Seq, got.TimeNs) } } |
