summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-12 23:33:15 +0200
committerPaul Buetow <paul@buetow.org>2026-03-12 23:33:15 +0200
commit775d3e59c7a6c060d0a9ecf3536c0df383d241be (patch)
tree981a6550849039ef4c443dce38a05829b79a11df
parent2efe5330cb43f685f3159a28ce211392a0bbe3c3 (diff)
refactor: extract shared syscall stream row model
-rw-r--r--internal/ior.go5
-rw-r--r--internal/streamrow/row.go137
-rw-r--r--internal/streamrow/row_test.go48
-rw-r--r--internal/tui/eventstream/streamevent.go107
-rw-r--r--internal/tui/eventstream/streamevent_test.go6
5 files changed, 201 insertions, 102 deletions
diff --git a/internal/ior.go b/internal/ior.go
index 8ad82c0..ea44fea 100644
--- a/internal/ior.go
+++ b/internal/ior.go
@@ -212,6 +212,7 @@ func tuiTraceStarterFromRunTrace(
bindings.SetLiveTrie(liveTrie)
}
streamEvents := make(chan eventstream.StreamEvent, appconfig.DefaultChannelBufferSize)
+ streamSeq := eventstream.NewSequencer(0)
go func() {
for ev := range streamEvents {
@@ -230,7 +231,7 @@ func tuiTraceStarterFromRunTrace(
return
}
engine.Ingest(ep)
- streamEvents <- eventstream.NewStreamEvent(ep.EnterEv.GetTime(), ep)
+ streamEvents <- eventstream.NewStreamEvent(streamSeq.Next(), ep)
liveTrie.Ingest(ep)
// Both downstream consumers snapshot the pair synchronously, so
// the pooled pair can be recycled immediately afterwards.
@@ -239,7 +240,7 @@ func tuiTraceStarterFromRunTrace(
el.warningCb = func(message string) {
// Drop warning notifications if the stream channel is saturated.
select {
- case streamEvents <- eventstream.NewWarningEvent(message):
+ case streamEvents <- eventstream.NewWarningEvent(streamSeq.Next(), message):
default:
}
}
diff --git a/internal/streamrow/row.go b/internal/streamrow/row.go
new file mode 100644
index 0000000..457fcc0
--- /dev/null
+++ b/internal/streamrow/row.go
@@ -0,0 +1,137 @@
+package streamrow
+
+import (
+ "sync/atomic"
+ "time"
+
+ "ior/internal/event"
+ "ior/internal/types"
+)
+
+// Row is the shared syscall stream row model used by live TUI views,
+// snapshot export, and future recording outputs.
+type Row struct {
+ Seq uint64
+ TimeNs uint64
+ Syscall string
+ Comm string
+ PID uint32
+ TID uint32
+ FileName string
+ DurationNs uint64
+ GapNs uint64
+ Bytes uint64
+ RetVal int64
+ IsError bool
+ FD int32
+}
+
+func (r Row) SyscallValue() string {
+ return r.Syscall
+}
+
+func (r Row) CommValue() string {
+ return r.Comm
+}
+
+func (r Row) FileValue() string {
+ return r.FileName
+}
+
+func (r Row) PIDValue() uint32 {
+ return r.PID
+}
+
+func (r Row) TIDValue() uint32 {
+ return r.TID
+}
+
+func (r Row) FDValue() int32 {
+ return r.FD
+}
+
+func (r Row) LatencyValue() uint64 {
+ return r.DurationNs
+}
+
+func (r Row) GapValue() uint64 {
+ return r.GapNs
+}
+
+func (r Row) BytesValue() uint64 {
+ return r.Bytes
+}
+
+func (r Row) ReturnValue() int64 {
+ return r.RetVal
+}
+
+func (r Row) ErrorValue() bool {
+ return r.IsError
+}
+
+// UnknownFD marks events that are not associated with a file descriptor.
+const UnknownFD int32 = -1
+
+// Sequencer hands out strictly increasing row sequence numbers.
+type Sequencer struct {
+ next atomic.Uint64
+}
+
+// NewSequencer constructs a monotonic sequence generator. The first call to
+// Next returns start+1.
+func NewSequencer(start uint64) *Sequencer {
+ s := &Sequencer{}
+ s.next.Store(start)
+ return s
+}
+
+// Next returns the next sequence number.
+func (s *Sequencer) Next() uint64 {
+ if s == nil {
+ return 0
+ }
+ return s.next.Add(1)
+}
+
+// New converts one syscall pair into the shared row model.
+func New(seq uint64, pair *event.Pair) Row {
+ row := Row{
+ Seq: seq,
+ TimeNs: pair.EnterEv.GetTime(),
+ Syscall: pair.EnterEv.GetTraceId().Name(),
+ Comm: pair.Comm,
+ PID: pair.EnterEv.GetPid(),
+ TID: pair.EnterEv.GetTid(),
+ FileName: pair.FileName(),
+ DurationNs: pair.Duration,
+ GapNs: pair.DurationToPrev,
+ Bytes: pair.Bytes,
+ FD: UnknownFD,
+ }
+ if fd, ok := pair.FileDescriptor(); ok {
+ row.FD = fd
+ }
+
+ if retEv, ok := pair.ExitEv.(*types.RetEvent); ok {
+ row.RetVal = retEv.Ret
+ row.IsError = retEv.Ret < 0
+ }
+
+ return row
+}
+
+// NewWarning creates a synthetic row for non-fatal runtime warnings.
+func NewWarning(seq uint64, message string) Row {
+ now := uint64(time.Now().UnixNano())
+ return Row{
+ Seq: seq,
+ TimeNs: now,
+ Syscall: "warning",
+ Comm: "ior",
+ FileName: message,
+ FD: UnknownFD,
+ RetVal: -1,
+ IsError: true,
+ }
+}
diff --git a/internal/streamrow/row_test.go b/internal/streamrow/row_test.go
new file mode 100644
index 0000000..729ba94
--- /dev/null
+++ b/internal/streamrow/row_test.go
@@ -0,0 +1,48 @@
+package streamrow
+
+import (
+ "sync"
+ "testing"
+)
+
+func TestSequencerStartsAfterSeed(t *testing.T) {
+ seq := NewSequencer(41)
+ if got, want := seq.Next(), uint64(42); got != want {
+ t.Fatalf("first Next() = %d, want %d", got, want)
+ }
+ if got, want := seq.Next(), uint64(43); got != want {
+ t.Fatalf("second Next() = %d, want %d", got, want)
+ }
+}
+
+func TestSequencerIsMonotonicUnderConcurrency(t *testing.T) {
+ seq := NewSequencer(0)
+
+ const workers = 8
+ const perWorker = 64
+
+ got := make(chan uint64, workers*perWorker)
+ var wg sync.WaitGroup
+ for i := 0; i < workers; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for j := 0; j < perWorker; j++ {
+ got <- seq.Next()
+ }
+ }()
+ }
+ wg.Wait()
+ close(got)
+
+ seen := make(map[uint64]struct{}, workers*perWorker)
+ for n := range got {
+ if _, ok := seen[n]; ok {
+ t.Fatalf("duplicate sequence number %d", n)
+ }
+ seen[n] = struct{}{}
+ }
+ if got, want := len(seen), workers*perWorker; got != want {
+ t.Fatalf("unique sequence count = %d, want %d", got, want)
+ }
+}
diff --git a/internal/tui/eventstream/streamevent.go b/internal/tui/eventstream/streamevent.go
index 9238a84..241b984 100644
--- a/internal/tui/eventstream/streamevent.go
+++ b/internal/tui/eventstream/streamevent.go
@@ -1,112 +1,25 @@
package eventstream
import (
- "time"
-
"ior/internal/event"
- "ior/internal/types"
+ "ior/internal/streamrow"
)
-type StreamEvent struct {
- Seq uint64
- TimeNs uint64
- Syscall string
- Comm string
- PID uint32
- TID uint32
- FileName string
- DurationNs uint64
- GapNs uint64
- Bytes uint64
- RetVal int64
- IsError bool
- FD int32
-}
-
-func (e StreamEvent) SyscallValue() string {
- return e.Syscall
-}
-
-func (e StreamEvent) CommValue() string {
- return e.Comm
-}
-
-func (e StreamEvent) FileValue() string {
- return e.FileName
-}
-
-func (e StreamEvent) PIDValue() uint32 {
- return e.PID
-}
-
-func (e StreamEvent) TIDValue() uint32 {
- return e.TID
-}
-
-func (e StreamEvent) FDValue() int32 {
- return e.FD
-}
-
-func (e StreamEvent) LatencyValue() uint64 {
- return e.DurationNs
-}
-
-func (e StreamEvent) GapValue() uint64 {
- return e.GapNs
-}
-
-func (e StreamEvent) BytesValue() uint64 {
- return e.Bytes
-}
+type StreamEvent = streamrow.Row
+type Sequencer = streamrow.Sequencer
-func (e StreamEvent) ReturnValue() int64 {
- return e.RetVal
-}
+// UnknownFD marks events that are not associated with a file descriptor.
+const UnknownFD = streamrow.UnknownFD
-func (e StreamEvent) ErrorValue() bool {
- return e.IsError
+func NewSequencer(start uint64) *Sequencer {
+ return streamrow.NewSequencer(start)
}
-// UnknownFD marks events that are not associated with a file descriptor.
-const UnknownFD int32 = -1
-
func NewStreamEvent(seq uint64, pair *event.Pair) StreamEvent {
- e := StreamEvent{
- Seq: seq,
- TimeNs: pair.EnterEv.GetTime(),
- Syscall: pair.EnterEv.GetTraceId().Name(),
- Comm: pair.Comm,
- PID: pair.EnterEv.GetPid(),
- TID: pair.EnterEv.GetTid(),
- FileName: pair.FileName(),
- DurationNs: pair.Duration,
- GapNs: pair.DurationToPrev,
- Bytes: pair.Bytes,
- FD: UnknownFD,
- }
- if fd, ok := pair.FileDescriptor(); ok {
- e.FD = fd
- }
-
- if retEv, ok := pair.ExitEv.(*types.RetEvent); ok {
- e.RetVal = retEv.Ret
- e.IsError = retEv.Ret < 0
- }
-
- return e
+ return streamrow.New(seq, pair)
}
// NewWarningEvent creates a synthetic stream row for non-fatal runtime warnings.
-func NewWarningEvent(message string) StreamEvent {
- now := uint64(time.Now().UnixNano())
- return StreamEvent{
- Seq: now,
- TimeNs: now,
- Syscall: "warning",
- Comm: "ior",
- FileName: message,
- FD: UnknownFD,
- RetVal: -1,
- IsError: true,
- }
+func NewWarningEvent(seq uint64, message string) StreamEvent {
+ return streamrow.NewWarning(seq, message)
}
diff --git a/internal/tui/eventstream/streamevent_test.go b/internal/tui/eventstream/streamevent_test.go
index dd65dd1..a1df8a1 100644
--- a/internal/tui/eventstream/streamevent_test.go
+++ b/internal/tui/eventstream/streamevent_test.go
@@ -97,7 +97,7 @@ func TestNewStreamEventWithoutRetEvent(t *testing.T) {
}
func TestNewWarningEventPopulatesFields(t *testing.T) {
- got := NewWarningEvent("Dropped malformed event")
+ got := NewWarningEvent(7, "Dropped malformed event")
if got.Syscall != "warning" {
t.Fatalf("Syscall = %q, want warning", got.Syscall)
@@ -117,7 +117,7 @@ func TestNewWarningEventPopulatesFields(t *testing.T) {
if !got.IsError {
t.Fatalf("IsError = false, want true")
}
- if got.Seq == 0 || got.TimeNs == 0 {
- t.Fatalf("Seq/TimeNs = %d/%d, want non-zero", got.Seq, got.TimeNs)
+ if got.Seq != 7 || got.TimeNs == 0 {
+ t.Fatalf("Seq/TimeNs = %d/%d, want 7/non-zero", got.Seq, got.TimeNs)
}
}