summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-12 23:47:23 +0200
committerPaul Buetow <paul@buetow.org>2026-03-12 23:47:23 +0200
commit767c2b54779cbf81b68217c6e83868cffb6a0965 (patch)
treed657bcb88f22357f506a5134227d87958190f07a /internal
parent775d3e59c7a6c060d0a9ecf3536c0df383d241be (diff)
feat: add parquet recorder foundation
Diffstat (limited to 'internal')
-rw-r--r--internal/parquet/recorder.go372
-rw-r--r--internal/parquet/recorder_test.go175
-rw-r--r--internal/parquet/schema.go81
-rw-r--r--internal/parquet/writer.go209
-rw-r--r--internal/parquet/writer_test.go113
5 files changed, 950 insertions, 0 deletions
diff --git a/internal/parquet/recorder.go b/internal/parquet/recorder.go
new file mode 100644
index 0000000..f489b68
--- /dev/null
+++ b/internal/parquet/recorder.go
@@ -0,0 +1,372 @@
+package parquet
+
+import (
+ "errors"
+ "sync"
+ "time"
+
+ "ior/internal/streamrow"
+)
+
+const (
+ defaultRecorderQueueCapacity = 4096
+ defaultRecorderBatchSize = 256
+ defaultRecorderFlushInterval = 250 * time.Millisecond
+)
+
+var (
+ // ErrRecorderActive indicates a start request while a recorder session is running.
+ ErrRecorderActive = errors.New("parquet recorder is already active")
+ // ErrRecorderNotActive indicates that rows cannot be accepted because no session is active.
+ ErrRecorderNotActive = errors.New("parquet recorder is not active")
+ // ErrRecorderQueueFull indicates bounded queue overflow. This is treated as a recording failure.
+ ErrRecorderQueueFull = errors.New("parquet recorder queue is full")
+)
+
+type rowWriter interface {
+ WriteRows([]Record) error
+ Close() error
+ Abort() error
+ FinalPath() string
+ TempPath() string
+}
+
+type writerFactory func(path string, cfg WriterConfig, meta FileMetadata) (rowWriter, error)
+
+// RecorderConfig controls queueing and batching behavior.
+type RecorderConfig struct {
+ QueueCapacity int
+ BatchSize int
+ FlushInterval time.Duration
+ Writer WriterConfig
+
+ newWriter writerFactory
+}
+
+// StartOptions supplies per-session metadata.
+type StartOptions struct {
+ Metadata FileMetadata
+}
+
+// Status reports the last known recorder state.
+type Status struct {
+ Active bool
+ Path string
+ TempPath string
+ RowsWritten uint64
+ LastError error
+}
+
+// Recorder manages one active parquet recording session at a time.
+type Recorder struct {
+ mu sync.RWMutex
+ config RecorderConfig
+ active *recordingSession
+ status Status
+}
+
+type recordingSession struct {
+ queue chan recordRequest
+ stopC chan struct{}
+ doneC chan error
+
+ mu sync.Mutex
+ accepting bool
+ stopCause error
+ stopOnce sync.Once
+}
+
+type recordRequest struct {
+ row streamrow.Row
+ filterEpoch uint64
+}
+
+// NewRecorder constructs a reusable parquet recorder controller.
+func NewRecorder(config RecorderConfig) *Recorder {
+ return &Recorder{config: normalizeRecorderConfig(config)}
+}
+
+// Start begins a new recording session.
+func (r *Recorder) Start(path string, options StartOptions) error {
+ if r == nil {
+ return ErrRecorderNotActive
+ }
+
+ cfg := normalizeRecorderConfig(r.config)
+ buildWriter := cfg.newWriter
+ if buildWriter == nil {
+ buildWriter = func(path string, cfg WriterConfig, meta FileMetadata) (rowWriter, error) {
+ return NewWriter(path, cfg, meta)
+ }
+ }
+
+ writer, err := buildWriter(path, cfg.Writer, options.Metadata)
+ if err != nil {
+ return err
+ }
+
+ session := newRecordingSession(cfg.QueueCapacity)
+
+ r.mu.Lock()
+ if r.active != nil {
+ r.mu.Unlock()
+ _ = writer.Abort()
+ return ErrRecorderActive
+ }
+ r.active = session
+ r.status = Status{
+ Active: true,
+ Path: writer.FinalPath(),
+ TempPath: writer.TempPath(),
+ }
+ r.mu.Unlock()
+
+ go r.runSession(session, writer, cfg)
+ return nil
+}
+
+// Record queues one shared stream row for persistence.
+func (r *Recorder) Record(row streamrow.Row, filterEpoch uint64) error {
+ if r == nil {
+ return ErrRecorderNotActive
+ }
+
+ r.mu.RLock()
+ session := r.active
+ lastErr := r.status.LastError
+ r.mu.RUnlock()
+
+ if session == nil {
+ if lastErr != nil {
+ return lastErr
+ }
+ return ErrRecorderNotActive
+ }
+ return session.enqueue(recordRequest{row: row, filterEpoch: filterEpoch})
+}
+
+// Stop gracefully flushes and finalizes the active recording session.
+func (r *Recorder) Stop() error {
+ if r == nil {
+ return nil
+ }
+
+ r.mu.RLock()
+ session := r.active
+ lastErr := r.status.LastError
+ r.mu.RUnlock()
+
+ if session == nil {
+ return lastErr
+ }
+
+ session.stop(nil)
+ return <-session.doneC
+}
+
+// Status returns a snapshot of the recorder state.
+func (r *Recorder) Status() Status {
+ if r == nil {
+ return Status{}
+ }
+
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ return r.status
+}
+
+func (r *Recorder) runSession(session *recordingSession, writer rowWriter, cfg RecorderConfig) {
+ ticker := time.NewTicker(cfg.FlushInterval)
+ defer ticker.Stop()
+
+ var written uint64
+ batch := make([]Record, 0, cfg.BatchSize)
+
+ for {
+ select {
+ case req := <-session.queue:
+ if err := r.bufferRecord(session, writer, &batch, &written, cfg.BatchSize, req); err != nil {
+ r.abortSession(session, writer, err)
+ return
+ }
+
+ case <-ticker.C:
+ if err := r.flushBatch(session, writer, &batch, &written); err != nil {
+ r.abortSession(session, writer, err)
+ return
+ }
+
+ case <-session.stopC:
+ r.completeSession(session, r.stopSession(session, writer, &batch, &written, cfg.BatchSize))
+ return
+ }
+ }
+}
+
+func (r *Recorder) bufferRecord(
+ session *recordingSession,
+ writer rowWriter,
+ batch *[]Record,
+ written *uint64,
+ batchSize int,
+ req recordRequest,
+) error {
+ *batch = append(*batch, RecordFromStream(req.row, req.filterEpoch))
+ if len(*batch) < batchSize {
+ return nil
+ }
+ return r.flushBatch(session, writer, batch, written)
+}
+
+func (r *Recorder) flushBatch(
+ session *recordingSession,
+ writer rowWriter,
+ batch *[]Record,
+ written *uint64,
+) error {
+ if len(*batch) == 0 {
+ return nil
+ }
+ if err := writer.WriteRows(*batch); err != nil {
+ return err
+ }
+ *written += uint64(len(*batch))
+ r.updateRowsWritten(session, *written)
+ *batch = (*batch)[:0]
+ return nil
+}
+
+func (r *Recorder) stopSession(
+ session *recordingSession,
+ writer rowWriter,
+ batch *[]Record,
+ written *uint64,
+ batchSize int,
+) error {
+ if cause := session.cause(); cause != nil {
+ _ = writer.Abort()
+ return cause
+ }
+ if err := drainQueue(session, func(req recordRequest) error {
+ return r.bufferRecord(session, writer, batch, written, batchSize, req)
+ }); err != nil {
+ _ = writer.Abort()
+ return err
+ }
+ if err := r.flushBatch(session, writer, batch, written); err != nil {
+ _ = writer.Abort()
+ return err
+ }
+ return writer.Close()
+}
+
+func (r *Recorder) abortSession(session *recordingSession, writer rowWriter, err error) {
+ session.stop(err)
+ _ = writer.Abort()
+ r.completeSession(session, err)
+}
+
+func (r *Recorder) completeSession(session *recordingSession, err error) {
+ r.finishSession(session, err)
+ session.doneC <- err
+ close(session.doneC)
+}
+
+func (r *Recorder) updateRowsWritten(session *recordingSession, rowsWritten uint64) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ if r.active != session {
+ return
+ }
+ r.status.RowsWritten = rowsWritten
+}
+
+func (r *Recorder) finishSession(session *recordingSession, err error) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ if r.active != session {
+ return
+ }
+ r.status.Active = false
+ r.status.LastError = err
+ if err == nil {
+ r.status.TempPath = ""
+ }
+ r.active = nil
+}
+
+func normalizeRecorderConfig(cfg RecorderConfig) RecorderConfig {
+ if cfg.QueueCapacity <= 0 {
+ cfg.QueueCapacity = defaultRecorderQueueCapacity
+ }
+ if cfg.BatchSize <= 0 {
+ cfg.BatchSize = defaultRecorderBatchSize
+ }
+ if cfg.FlushInterval <= 0 {
+ cfg.FlushInterval = defaultRecorderFlushInterval
+ }
+ cfg.Writer = normalizeWriterConfig(cfg.Writer)
+ return cfg
+}
+
+func newRecordingSession(queueCapacity int) *recordingSession {
+ return &recordingSession{
+ queue: make(chan recordRequest, queueCapacity),
+ stopC: make(chan struct{}),
+ doneC: make(chan error, 1),
+ accepting: true,
+ }
+}
+
+func (s *recordingSession) enqueue(req recordRequest) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if !s.accepting {
+ if s.stopCause != nil {
+ return s.stopCause
+ }
+ return ErrRecorderNotActive
+ }
+
+ select {
+ case s.queue <- req:
+ return nil
+ default:
+ s.accepting = false
+ if s.stopCause == nil {
+ s.stopCause = ErrRecorderQueueFull
+ }
+ s.stopOnce.Do(func() { close(s.stopC) })
+ return ErrRecorderQueueFull
+ }
+}
+
+func (s *recordingSession) stop(cause error) {
+ s.mu.Lock()
+ s.accepting = false
+ if cause != nil && s.stopCause == nil {
+ s.stopCause = cause
+ }
+ s.mu.Unlock()
+ s.stopOnce.Do(func() { close(s.stopC) })
+}
+
+func (s *recordingSession) cause() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.stopCause
+}
+
+func drainQueue(session *recordingSession, consume func(recordRequest) error) error {
+ for {
+ select {
+ case req := <-session.queue:
+ if err := consume(req); err != nil {
+ return err
+ }
+ default:
+ return nil
+ }
+ }
+}
diff --git a/internal/parquet/recorder_test.go b/internal/parquet/recorder_test.go
new file mode 100644
index 0000000..d4caa11
--- /dev/null
+++ b/internal/parquet/recorder_test.go
@@ -0,0 +1,175 @@
+package parquet
+
+import (
+ "errors"
+ "path/filepath"
+ "reflect"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "ior/internal/streamrow"
+)
+
+func TestRecorderRoundTrip(t *testing.T) {
+ recorder := NewRecorder(RecorderConfig{
+ QueueCapacity: 8,
+ BatchSize: 2,
+ FlushInterval: time.Hour,
+ })
+
+ path := filepath.Join(t.TempDir(), "session")
+ if err := recorder.Start(path, StartOptions{Metadata: FileMetadata{Mode: "tui"}}); err != nil {
+ t.Fatalf("Start() error = %v", err)
+ }
+
+ rows := []streamrow.Row{
+ testStreamRow(1, "read", false),
+ testStreamRow(2, "write", false),
+ testStreamRow(3, "openat", true),
+ }
+ epochs := []uint64{0, 0, 3}
+
+ for i := range rows {
+ if err := recorder.Record(rows[i], epochs[i]); err != nil {
+ t.Fatalf("Record(%d) error = %v", i, err)
+ }
+ }
+
+ if err := recorder.Stop(); err != nil {
+ t.Fatalf("Stop() error = %v", err)
+ }
+
+ status := recorder.Status()
+ if status.Active {
+ t.Fatalf("Status().Active = true, want false")
+ }
+ if status.RowsWritten != 3 {
+ t.Fatalf("Status().RowsWritten = %d, want 3", status.RowsWritten)
+ }
+ if status.LastError != nil {
+ t.Fatalf("Status().LastError = %v, want nil", status.LastError)
+ }
+ if status.TempPath != "" {
+ t.Fatalf("Status().TempPath = %q, want empty after successful stop", status.TempPath)
+ }
+
+ want := []Record{
+ RecordFromStream(rows[0], epochs[0]),
+ RecordFromStream(rows[1], epochs[1]),
+ RecordFromStream(rows[2], epochs[2]),
+ }
+ got := readAllRecords(t, status.Path)
+ if !reflect.DeepEqual(got, want) {
+ t.Fatalf("records mismatch\n got: %+v\nwant: %+v", got, want)
+ }
+}
+
+func TestRecorderFailsOnQueueOverflow(t *testing.T) {
+ writer := newBlockingWriter()
+ recorder := NewRecorder(RecorderConfig{
+ QueueCapacity: 1,
+ BatchSize: 1,
+ FlushInterval: time.Hour,
+ newWriter: func(string, WriterConfig, FileMetadata) (rowWriter, error) {
+ return writer, nil
+ },
+ })
+
+ if err := recorder.Start("ignored", StartOptions{}); err != nil {
+ t.Fatalf("Start() error = %v", err)
+ }
+ if err := recorder.Record(testStreamRow(1, "read", false), 0); err != nil {
+ t.Fatalf("first Record() error = %v", err)
+ }
+
+ <-writer.started
+
+ if err := recorder.Record(testStreamRow(2, "write", false), 0); err != nil {
+ t.Fatalf("second Record() error = %v", err)
+ }
+ if err := recorder.Record(testStreamRow(3, "openat", false), 0); !errors.Is(err, ErrRecorderQueueFull) {
+ t.Fatalf("third Record() error = %v, want %v", err, ErrRecorderQueueFull)
+ }
+
+ writer.releaseWrites()
+
+ if err := recorder.Stop(); !errors.Is(err, ErrRecorderQueueFull) {
+ t.Fatalf("Stop() error = %v, want %v", err, ErrRecorderQueueFull)
+ }
+
+ status := recorder.Status()
+ if status.Active {
+ t.Fatalf("Status().Active = true, want false")
+ }
+ if !errors.Is(status.LastError, ErrRecorderQueueFull) {
+ t.Fatalf("Status().LastError = %v, want %v", status.LastError, ErrRecorderQueueFull)
+ }
+ if !writer.aborted.Load() {
+ t.Fatalf("expected recorder failure path to abort the backing writer")
+ }
+}
+
+func testStreamRow(seq uint64, syscall string, isError bool) streamrow.Row {
+ return streamrow.Row{
+ Seq: seq,
+ TimeNs: seq * 10,
+ Syscall: syscall,
+ Comm: "ior-test",
+ PID: 100 + uint32(seq),
+ TID: 200 + uint32(seq),
+ FileName: "/tmp/file",
+ DurationNs: seq + 1,
+ GapNs: seq + 2,
+ Bytes: seq + 3,
+ RetVal: int64(seq),
+ IsError: isError,
+ FD: int32(seq),
+ }
+}
+
+type blockingWriter struct {
+ started chan struct{}
+ release chan struct{}
+
+ startOnce sync.Once
+ releaseOnce sync.Once
+
+ aborted atomic.Bool
+}
+
+func newBlockingWriter() *blockingWriter {
+ return &blockingWriter{
+ started: make(chan struct{}),
+ release: make(chan struct{}),
+ }
+}
+
+func (w *blockingWriter) WriteRows([]Record) error {
+ w.startOnce.Do(func() { close(w.started) })
+ <-w.release
+ return nil
+}
+
+func (w *blockingWriter) Close() error {
+ return nil
+}
+
+func (w *blockingWriter) Abort() error {
+ w.aborted.Store(true)
+ w.releaseWrites()
+ return nil
+}
+
+func (w *blockingWriter) FinalPath() string {
+ return "ignored.parquet"
+}
+
+func (w *blockingWriter) TempPath() string {
+ return "ignored.parquet.tmp"
+}
+
+func (w *blockingWriter) releaseWrites() {
+ w.releaseOnce.Do(func() { close(w.release) })
+}
diff --git a/internal/parquet/schema.go b/internal/parquet/schema.go
new file mode 100644
index 0000000..2ede444
--- /dev/null
+++ b/internal/parquet/schema.go
@@ -0,0 +1,81 @@
+package parquet
+
+import (
+ "strconv"
+
+ "ior/internal/flags"
+ "ior/internal/streamrow"
+
+ parquetgo "github.com/parquet-go/parquet-go"
+)
+
+// Record is the persisted Parquet schema for one syscall stream row.
+type Record struct {
+ Seq uint64 `parquet:"seq"`
+ TimeNS uint64 `parquet:"time_ns"`
+ GapNS uint64 `parquet:"gap_ns"`
+ LatencyNS uint64 `parquet:"latency_ns"`
+ Comm string `parquet:"comm"`
+ PID uint32 `parquet:"pid"`
+ TID uint32 `parquet:"tid"`
+ Syscall string `parquet:"syscall"`
+ FD int32 `parquet:"fd"`
+ Ret int64 `parquet:"ret"`
+ Bytes uint64 `parquet:"bytes"`
+ File string `parquet:"file"`
+ IsError bool `parquet:"is_error"`
+ FilterEpoch uint64 `parquet:"filter_epoch"`
+}
+
+// FileMetadata captures constant metadata written once into the parquet file.
+type FileMetadata struct {
+ Hostname string
+ StartedAtUnixNano uint64
+ Mode string
+ IORVersion string
+}
+
+// RecordFromStream converts one shared stream row into the persisted format.
+func RecordFromStream(row streamrow.Row, filterEpoch uint64) Record {
+ return Record{
+ Seq: row.Seq,
+ TimeNS: row.TimeNs,
+ GapNS: row.GapNs,
+ LatencyNS: row.DurationNs,
+ Comm: row.Comm,
+ PID: row.PID,
+ TID: row.TID,
+ Syscall: row.Syscall,
+ FD: row.FD,
+ Ret: row.RetVal,
+ Bytes: row.Bytes,
+ File: row.FileName,
+ IsError: row.IsError,
+ FilterEpoch: filterEpoch,
+ }
+}
+
+func writerMetadataOptions(meta FileMetadata) []parquetgo.WriterOption {
+ meta = normalizeMetadata(meta)
+ options := make([]parquetgo.WriterOption, 0, 4)
+ if meta.Hostname != "" {
+ options = append(options, parquetgo.KeyValueMetadata("ior.hostname", meta.Hostname))
+ }
+ if meta.StartedAtUnixNano != 0 {
+ options = append(options, parquetgo.KeyValueMetadata("ior.started_at_unix_nano", strconv.FormatUint(meta.StartedAtUnixNano, 10)))
+ }
+ if meta.Mode != "" {
+ options = append(options, parquetgo.KeyValueMetadata("ior.mode", meta.Mode))
+ }
+ if meta.IORVersion != "" {
+ options = append(options, parquetgo.KeyValueMetadata("ior.version", meta.IORVersion))
+ }
+ return options
+}
+
+func normalizeMetadata(meta FileMetadata) FileMetadata {
+ if meta.IORVersion == "" {
+ meta.IORVersion = flags.Version
+ }
+ return meta
+}
diff --git a/internal/parquet/writer.go b/internal/parquet/writer.go
new file mode 100644
index 0000000..82428d9
--- /dev/null
+++ b/internal/parquet/writer.go
@@ -0,0 +1,209 @@
+package parquet
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+
+ parquetgo "github.com/parquet-go/parquet-go"
+)
+
+const (
+ defaultMaxRowsPerRowGroup = int64(8192)
+ defaultPageBufferSize = 256 * 1024
+)
+
+var errWriterClosed = errors.New("parquet writer is closed")
+
+// WriterConfig tunes parquet file layout details.
+type WriterConfig struct {
+ MaxRowsPerRowGroup int64
+ PageBufferSize int
+}
+
+// DefaultWriterConfig returns the recommended writer defaults for ior traces.
+func DefaultWriterConfig() WriterConfig {
+ return WriterConfig{
+ MaxRowsPerRowGroup: defaultMaxRowsPerRowGroup,
+ PageBufferSize: defaultPageBufferSize,
+ }
+}
+
+type writerState int
+
+const (
+ writerStateOpen writerState = iota
+ writerStateClosed
+ writerStateAborted
+)
+
+// Writer wraps the parquet library behind repo-local file lifecycle semantics.
+type Writer struct {
+ mu sync.Mutex
+
+ finalPath string
+ tempPath string
+ file *os.File
+ writer *parquetgo.GenericWriter[Record]
+ state writerState
+}
+
+// NewWriter creates a new parquet writer that writes to a temporary file first
+// and only publishes the final path once Close succeeds.
+func NewWriter(path string, cfg WriterConfig, meta FileMetadata) (*Writer, error) {
+ finalPath, tempPath, err := normalizeOutputPaths(path)
+ if err != nil {
+ return nil, err
+ }
+
+ cfg = normalizeWriterConfig(cfg)
+ file, err := os.Create(tempPath)
+ if err != nil {
+ return nil, err
+ }
+
+ options := []parquetgo.WriterOption{
+ parquetgo.Compression(&parquetgo.Zstd),
+ parquetgo.CreatedBy("ior", normalizeMetadata(meta).IORVersion, ""),
+ parquetgo.MaxRowsPerRowGroup(cfg.MaxRowsPerRowGroup),
+ parquetgo.PageBufferSize(cfg.PageBufferSize),
+ }
+ options = append(options, writerMetadataOptions(meta)...)
+
+ return &Writer{
+ finalPath: finalPath,
+ tempPath: tempPath,
+ file: file,
+ writer: parquetgo.NewGenericWriter[Record](file, options...),
+ state: writerStateOpen,
+ }, nil
+}
+
+// FinalPath returns the finalized parquet path.
+func (w *Writer) FinalPath() string {
+ if w == nil {
+ return ""
+ }
+ return w.finalPath
+}
+
+// TempPath returns the temporary parquet path used before finalization.
+func (w *Writer) TempPath() string {
+ if w == nil {
+ return ""
+ }
+ return w.tempPath
+}
+
+// WriteRows appends a batch of records to the parquet file.
+func (w *Writer) WriteRows(rows []Record) error {
+ if len(rows) == 0 {
+ return nil
+ }
+
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ if w.state != writerStateOpen {
+ return errWriterClosed
+ }
+
+ written, err := w.writer.Write(rows)
+ if err != nil {
+ return fmt.Errorf("write parquet rows: %w", err)
+ }
+ if written != len(rows) {
+ return fmt.Errorf("write parquet rows: wrote %d of %d rows", written, len(rows))
+ }
+ return nil
+}
+
+// Close finalizes the parquet footer and publishes the file atomically.
+func (w *Writer) Close() error {
+ if w == nil {
+ return nil
+ }
+
+ w.mu.Lock()
+ if w.state != writerStateOpen {
+ w.mu.Unlock()
+ return nil
+ }
+ file := w.file
+ writer := w.writer
+ tempPath := w.tempPath
+ finalPath := w.finalPath
+ w.state = writerStateClosed
+ w.mu.Unlock()
+
+ if err := writer.Close(); err != nil {
+ closeErr := file.Close()
+ removeErr := os.Remove(tempPath)
+ return errors.Join(fmt.Errorf("close parquet writer: %w", err), closeErr, removeErr)
+ }
+ if err := file.Close(); err != nil {
+ removeErr := os.Remove(tempPath)
+ return errors.Join(fmt.Errorf("close parquet file: %w", err), removeErr)
+ }
+ if err := os.Rename(tempPath, finalPath); err != nil {
+ return fmt.Errorf("rename parquet file %q to %q: %w", tempPath, finalPath, err)
+ }
+ return nil
+}
+
+// Abort discards the temporary parquet file.
+func (w *Writer) Abort() error {
+ if w == nil {
+ return nil
+ }
+
+ w.mu.Lock()
+ if w.state != writerStateOpen {
+ w.mu.Unlock()
+ return nil
+ }
+ file := w.file
+ tempPath := w.tempPath
+ w.state = writerStateAborted
+ w.mu.Unlock()
+
+ closeErr := file.Close()
+ removeErr := os.Remove(tempPath)
+ if errors.Is(removeErr, os.ErrNotExist) {
+ removeErr = nil
+ }
+ return errors.Join(closeErr, removeErr)
+}
+
+func normalizeWriterConfig(cfg WriterConfig) WriterConfig {
+ defaults := DefaultWriterConfig()
+ if cfg.MaxRowsPerRowGroup <= 0 {
+ cfg.MaxRowsPerRowGroup = defaults.MaxRowsPerRowGroup
+ }
+ if cfg.PageBufferSize <= 0 {
+ cfg.PageBufferSize = defaults.PageBufferSize
+ }
+ return cfg
+}
+
+func normalizeOutputPaths(path string) (string, string, error) {
+ clean := filepath.Clean(strings.TrimSpace(path))
+ if clean == "." || clean == "" {
+ return "", "", errors.New("parquet output path cannot be empty")
+ }
+
+ lower := strings.ToLower(clean)
+ switch {
+ case strings.HasSuffix(lower, ".parquet.tmp"):
+ finalPath := strings.TrimSuffix(clean, ".tmp")
+ return finalPath, clean, nil
+ case strings.HasSuffix(lower, ".parquet"):
+ return clean, clean + ".tmp", nil
+ default:
+ finalPath := clean + ".parquet"
+ return finalPath, finalPath + ".tmp", nil
+ }
+}
diff --git a/internal/parquet/writer_test.go b/internal/parquet/writer_test.go
new file mode 100644
index 0000000..0bd7d97
--- /dev/null
+++ b/internal/parquet/writer_test.go
@@ -0,0 +1,113 @@
+package parquet
+
+import (
+ "io"
+ "os"
+ "path/filepath"
+ "reflect"
+ "testing"
+
+ parquetgo "github.com/parquet-go/parquet-go"
+)
+
+func TestWriterRoundTripAndFinalize(t *testing.T) {
+ dir := t.TempDir()
+ writer, err := NewWriter(filepath.Join(dir, "trace"), WriterConfig{}, FileMetadata{
+ Hostname: "test-host",
+ StartedAtUnixNano: 1234,
+ Mode: "tui",
+ })
+ if err != nil {
+ t.Fatalf("NewWriter() error = %v", err)
+ }
+
+ rows := []Record{
+ {
+ Seq: 1,
+ TimeNS: 10,
+ GapNS: 2,
+ LatencyNS: 5,
+ Comm: "cat",
+ PID: 11,
+ TID: 12,
+ Syscall: "read",
+ FD: 3,
+ Ret: 42,
+ Bytes: 42,
+ File: "/tmp/input",
+ IsError: false,
+ FilterEpoch: 7,
+ },
+ {
+ Seq: 2,
+ TimeNS: 20,
+ GapNS: 3,
+ LatencyNS: 6,
+ Comm: "cp",
+ PID: 21,
+ TID: 22,
+ Syscall: "write",
+ FD: 4,
+ Ret: -1,
+ Bytes: 99,
+ File: "/tmp/output",
+ IsError: true,
+ FilterEpoch: 8,
+ },
+ }
+
+ if err := writer.WriteRows(rows); err != nil {
+ t.Fatalf("WriteRows() error = %v", err)
+ }
+ if _, err := os.Stat(writer.TempPath()); err != nil {
+ t.Fatalf("Stat(%q) error = %v, want temp file present", writer.TempPath(), err)
+ }
+ if _, err := os.Stat(writer.FinalPath()); !os.IsNotExist(err) {
+ t.Fatalf("Stat(%q) error = %v, want not-exist before Close", writer.FinalPath(), err)
+ }
+
+ if err := writer.Close(); err != nil {
+ t.Fatalf("Close() error = %v", err)
+ }
+
+ if _, err := os.Stat(writer.TempPath()); !os.IsNotExist(err) {
+ t.Fatalf("Stat(%q) error = %v, want temp removed after Close", writer.TempPath(), err)
+ }
+ if _, err := os.Stat(writer.FinalPath()); err != nil {
+ t.Fatalf("Stat(%q) error = %v, want finalized parquet file", writer.FinalPath(), err)
+ }
+
+ got := readAllRecords(t, writer.FinalPath())
+ if !reflect.DeepEqual(got, rows) {
+ t.Fatalf("records mismatch\n got: %+v\nwant: %+v", got, rows)
+ }
+}
+
+func readAllRecords(t *testing.T, path string) []Record {
+ t.Helper()
+
+ f, err := os.Open(path)
+ if err != nil {
+ t.Fatalf("Open(%q) error = %v", path, err)
+ }
+ defer f.Close()
+
+ reader := parquetgo.NewGenericReader[Record](f)
+ defer reader.Close()
+
+ var rows []Record
+ buf := make([]Record, 4)
+ for {
+ n, err := reader.Read(buf)
+ if n > 0 {
+ rows = append(rows, buf[:n]...)
+ }
+ if err == nil {
+ continue
+ }
+ if err == io.EOF {
+ return rows
+ }
+ t.Fatalf("Read() error = %v", err)
+ }
+}