diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-12 23:47:23 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-12 23:47:23 +0200 |
| commit | 767c2b54779cbf81b68217c6e83868cffb6a0965 (patch) | |
| tree | d657bcb88f22357f506a5134227d87958190f07a /internal | |
| parent | 775d3e59c7a6c060d0a9ecf3536c0df383d241be (diff) | |
feat: add parquet recorder foundation
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/parquet/recorder.go | 372 | ||||
| -rw-r--r-- | internal/parquet/recorder_test.go | 175 | ||||
| -rw-r--r-- | internal/parquet/schema.go | 81 | ||||
| -rw-r--r-- | internal/parquet/writer.go | 209 | ||||
| -rw-r--r-- | internal/parquet/writer_test.go | 113 |
5 files changed, 950 insertions, 0 deletions
diff --git a/internal/parquet/recorder.go b/internal/parquet/recorder.go new file mode 100644 index 0000000..f489b68 --- /dev/null +++ b/internal/parquet/recorder.go @@ -0,0 +1,372 @@ +package parquet + +import ( + "errors" + "sync" + "time" + + "ior/internal/streamrow" +) + +const ( + defaultRecorderQueueCapacity = 4096 + defaultRecorderBatchSize = 256 + defaultRecorderFlushInterval = 250 * time.Millisecond +) + +var ( + // ErrRecorderActive indicates a start request while a recorder session is running. + ErrRecorderActive = errors.New("parquet recorder is already active") + // ErrRecorderNotActive indicates that rows cannot be accepted because no session is active. + ErrRecorderNotActive = errors.New("parquet recorder is not active") + // ErrRecorderQueueFull indicates bounded queue overflow. This is treated as a recording failure. + ErrRecorderQueueFull = errors.New("parquet recorder queue is full") +) + +type rowWriter interface { + WriteRows([]Record) error + Close() error + Abort() error + FinalPath() string + TempPath() string +} + +type writerFactory func(path string, cfg WriterConfig, meta FileMetadata) (rowWriter, error) + +// RecorderConfig controls queueing and batching behavior. +type RecorderConfig struct { + QueueCapacity int + BatchSize int + FlushInterval time.Duration + Writer WriterConfig + + newWriter writerFactory +} + +// StartOptions supplies per-session metadata. +type StartOptions struct { + Metadata FileMetadata +} + +// Status reports the last known recorder state. +type Status struct { + Active bool + Path string + TempPath string + RowsWritten uint64 + LastError error +} + +// Recorder manages one active parquet recording session at a time. +type Recorder struct { + mu sync.RWMutex + config RecorderConfig + active *recordingSession + status Status +} + +type recordingSession struct { + queue chan recordRequest + stopC chan struct{} + doneC chan error + + mu sync.Mutex + accepting bool + stopCause error + stopOnce sync.Once +} + +type recordRequest struct { + row streamrow.Row + filterEpoch uint64 +} + +// NewRecorder constructs a reusable parquet recorder controller. +func NewRecorder(config RecorderConfig) *Recorder { + return &Recorder{config: normalizeRecorderConfig(config)} +} + +// Start begins a new recording session. +func (r *Recorder) Start(path string, options StartOptions) error { + if r == nil { + return ErrRecorderNotActive + } + + cfg := normalizeRecorderConfig(r.config) + buildWriter := cfg.newWriter + if buildWriter == nil { + buildWriter = func(path string, cfg WriterConfig, meta FileMetadata) (rowWriter, error) { + return NewWriter(path, cfg, meta) + } + } + + writer, err := buildWriter(path, cfg.Writer, options.Metadata) + if err != nil { + return err + } + + session := newRecordingSession(cfg.QueueCapacity) + + r.mu.Lock() + if r.active != nil { + r.mu.Unlock() + _ = writer.Abort() + return ErrRecorderActive + } + r.active = session + r.status = Status{ + Active: true, + Path: writer.FinalPath(), + TempPath: writer.TempPath(), + } + r.mu.Unlock() + + go r.runSession(session, writer, cfg) + return nil +} + +// Record queues one shared stream row for persistence. +func (r *Recorder) Record(row streamrow.Row, filterEpoch uint64) error { + if r == nil { + return ErrRecorderNotActive + } + + r.mu.RLock() + session := r.active + lastErr := r.status.LastError + r.mu.RUnlock() + + if session == nil { + if lastErr != nil { + return lastErr + } + return ErrRecorderNotActive + } + return session.enqueue(recordRequest{row: row, filterEpoch: filterEpoch}) +} + +// Stop gracefully flushes and finalizes the active recording session. +func (r *Recorder) Stop() error { + if r == nil { + return nil + } + + r.mu.RLock() + session := r.active + lastErr := r.status.LastError + r.mu.RUnlock() + + if session == nil { + return lastErr + } + + session.stop(nil) + return <-session.doneC +} + +// Status returns a snapshot of the recorder state. +func (r *Recorder) Status() Status { + if r == nil { + return Status{} + } + + r.mu.RLock() + defer r.mu.RUnlock() + return r.status +} + +func (r *Recorder) runSession(session *recordingSession, writer rowWriter, cfg RecorderConfig) { + ticker := time.NewTicker(cfg.FlushInterval) + defer ticker.Stop() + + var written uint64 + batch := make([]Record, 0, cfg.BatchSize) + + for { + select { + case req := <-session.queue: + if err := r.bufferRecord(session, writer, &batch, &written, cfg.BatchSize, req); err != nil { + r.abortSession(session, writer, err) + return + } + + case <-ticker.C: + if err := r.flushBatch(session, writer, &batch, &written); err != nil { + r.abortSession(session, writer, err) + return + } + + case <-session.stopC: + r.completeSession(session, r.stopSession(session, writer, &batch, &written, cfg.BatchSize)) + return + } + } +} + +func (r *Recorder) bufferRecord( + session *recordingSession, + writer rowWriter, + batch *[]Record, + written *uint64, + batchSize int, + req recordRequest, +) error { + *batch = append(*batch, RecordFromStream(req.row, req.filterEpoch)) + if len(*batch) < batchSize { + return nil + } + return r.flushBatch(session, writer, batch, written) +} + +func (r *Recorder) flushBatch( + session *recordingSession, + writer rowWriter, + batch *[]Record, + written *uint64, +) error { + if len(*batch) == 0 { + return nil + } + if err := writer.WriteRows(*batch); err != nil { + return err + } + *written += uint64(len(*batch)) + r.updateRowsWritten(session, *written) + *batch = (*batch)[:0] + return nil +} + +func (r *Recorder) stopSession( + session *recordingSession, + writer rowWriter, + batch *[]Record, + written *uint64, + batchSize int, +) error { + if cause := session.cause(); cause != nil { + _ = writer.Abort() + return cause + } + if err := drainQueue(session, func(req recordRequest) error { + return r.bufferRecord(session, writer, batch, written, batchSize, req) + }); err != nil { + _ = writer.Abort() + return err + } + if err := r.flushBatch(session, writer, batch, written); err != nil { + _ = writer.Abort() + return err + } + return writer.Close() +} + +func (r *Recorder) abortSession(session *recordingSession, writer rowWriter, err error) { + session.stop(err) + _ = writer.Abort() + r.completeSession(session, err) +} + +func (r *Recorder) completeSession(session *recordingSession, err error) { + r.finishSession(session, err) + session.doneC <- err + close(session.doneC) +} + +func (r *Recorder) updateRowsWritten(session *recordingSession, rowsWritten uint64) { + r.mu.Lock() + defer r.mu.Unlock() + if r.active != session { + return + } + r.status.RowsWritten = rowsWritten +} + +func (r *Recorder) finishSession(session *recordingSession, err error) { + r.mu.Lock() + defer r.mu.Unlock() + if r.active != session { + return + } + r.status.Active = false + r.status.LastError = err + if err == nil { + r.status.TempPath = "" + } + r.active = nil +} + +func normalizeRecorderConfig(cfg RecorderConfig) RecorderConfig { + if cfg.QueueCapacity <= 0 { + cfg.QueueCapacity = defaultRecorderQueueCapacity + } + if cfg.BatchSize <= 0 { + cfg.BatchSize = defaultRecorderBatchSize + } + if cfg.FlushInterval <= 0 { + cfg.FlushInterval = defaultRecorderFlushInterval + } + cfg.Writer = normalizeWriterConfig(cfg.Writer) + return cfg +} + +func newRecordingSession(queueCapacity int) *recordingSession { + return &recordingSession{ + queue: make(chan recordRequest, queueCapacity), + stopC: make(chan struct{}), + doneC: make(chan error, 1), + accepting: true, + } +} + +func (s *recordingSession) enqueue(req recordRequest) error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.accepting { + if s.stopCause != nil { + return s.stopCause + } + return ErrRecorderNotActive + } + + select { + case s.queue <- req: + return nil + default: + s.accepting = false + if s.stopCause == nil { + s.stopCause = ErrRecorderQueueFull + } + s.stopOnce.Do(func() { close(s.stopC) }) + return ErrRecorderQueueFull + } +} + +func (s *recordingSession) stop(cause error) { + s.mu.Lock() + s.accepting = false + if cause != nil && s.stopCause == nil { + s.stopCause = cause + } + s.mu.Unlock() + s.stopOnce.Do(func() { close(s.stopC) }) +} + +func (s *recordingSession) cause() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.stopCause +} + +func drainQueue(session *recordingSession, consume func(recordRequest) error) error { + for { + select { + case req := <-session.queue: + if err := consume(req); err != nil { + return err + } + default: + return nil + } + } +} diff --git a/internal/parquet/recorder_test.go b/internal/parquet/recorder_test.go new file mode 100644 index 0000000..d4caa11 --- /dev/null +++ b/internal/parquet/recorder_test.go @@ -0,0 +1,175 @@ +package parquet + +import ( + "errors" + "path/filepath" + "reflect" + "sync" + "sync/atomic" + "testing" + "time" + + "ior/internal/streamrow" +) + +func TestRecorderRoundTrip(t *testing.T) { + recorder := NewRecorder(RecorderConfig{ + QueueCapacity: 8, + BatchSize: 2, + FlushInterval: time.Hour, + }) + + path := filepath.Join(t.TempDir(), "session") + if err := recorder.Start(path, StartOptions{Metadata: FileMetadata{Mode: "tui"}}); err != nil { + t.Fatalf("Start() error = %v", err) + } + + rows := []streamrow.Row{ + testStreamRow(1, "read", false), + testStreamRow(2, "write", false), + testStreamRow(3, "openat", true), + } + epochs := []uint64{0, 0, 3} + + for i := range rows { + if err := recorder.Record(rows[i], epochs[i]); err != nil { + t.Fatalf("Record(%d) error = %v", i, err) + } + } + + if err := recorder.Stop(); err != nil { + t.Fatalf("Stop() error = %v", err) + } + + status := recorder.Status() + if status.Active { + t.Fatalf("Status().Active = true, want false") + } + if status.RowsWritten != 3 { + t.Fatalf("Status().RowsWritten = %d, want 3", status.RowsWritten) + } + if status.LastError != nil { + t.Fatalf("Status().LastError = %v, want nil", status.LastError) + } + if status.TempPath != "" { + t.Fatalf("Status().TempPath = %q, want empty after successful stop", status.TempPath) + } + + want := []Record{ + RecordFromStream(rows[0], epochs[0]), + RecordFromStream(rows[1], epochs[1]), + RecordFromStream(rows[2], epochs[2]), + } + got := readAllRecords(t, status.Path) + if !reflect.DeepEqual(got, want) { + t.Fatalf("records mismatch\n got: %+v\nwant: %+v", got, want) + } +} + +func TestRecorderFailsOnQueueOverflow(t *testing.T) { + writer := newBlockingWriter() + recorder := NewRecorder(RecorderConfig{ + QueueCapacity: 1, + BatchSize: 1, + FlushInterval: time.Hour, + newWriter: func(string, WriterConfig, FileMetadata) (rowWriter, error) { + return writer, nil + }, + }) + + if err := recorder.Start("ignored", StartOptions{}); err != nil { + t.Fatalf("Start() error = %v", err) + } + if err := recorder.Record(testStreamRow(1, "read", false), 0); err != nil { + t.Fatalf("first Record() error = %v", err) + } + + <-writer.started + + if err := recorder.Record(testStreamRow(2, "write", false), 0); err != nil { + t.Fatalf("second Record() error = %v", err) + } + if err := recorder.Record(testStreamRow(3, "openat", false), 0); !errors.Is(err, ErrRecorderQueueFull) { + t.Fatalf("third Record() error = %v, want %v", err, ErrRecorderQueueFull) + } + + writer.releaseWrites() + + if err := recorder.Stop(); !errors.Is(err, ErrRecorderQueueFull) { + t.Fatalf("Stop() error = %v, want %v", err, ErrRecorderQueueFull) + } + + status := recorder.Status() + if status.Active { + t.Fatalf("Status().Active = true, want false") + } + if !errors.Is(status.LastError, ErrRecorderQueueFull) { + t.Fatalf("Status().LastError = %v, want %v", status.LastError, ErrRecorderQueueFull) + } + if !writer.aborted.Load() { + t.Fatalf("expected recorder failure path to abort the backing writer") + } +} + +func testStreamRow(seq uint64, syscall string, isError bool) streamrow.Row { + return streamrow.Row{ + Seq: seq, + TimeNs: seq * 10, + Syscall: syscall, + Comm: "ior-test", + PID: 100 + uint32(seq), + TID: 200 + uint32(seq), + FileName: "/tmp/file", + DurationNs: seq + 1, + GapNs: seq + 2, + Bytes: seq + 3, + RetVal: int64(seq), + IsError: isError, + FD: int32(seq), + } +} + +type blockingWriter struct { + started chan struct{} + release chan struct{} + + startOnce sync.Once + releaseOnce sync.Once + + aborted atomic.Bool +} + +func newBlockingWriter() *blockingWriter { + return &blockingWriter{ + started: make(chan struct{}), + release: make(chan struct{}), + } +} + +func (w *blockingWriter) WriteRows([]Record) error { + w.startOnce.Do(func() { close(w.started) }) + <-w.release + return nil +} + +func (w *blockingWriter) Close() error { + return nil +} + +func (w *blockingWriter) Abort() error { + w.aborted.Store(true) + w.releaseWrites() + return nil +} + +func (w *blockingWriter) FinalPath() string { + return "ignored.parquet" +} + +func (w *blockingWriter) TempPath() string { + return "ignored.parquet.tmp" +} + +func (w *blockingWriter) releaseWrites() { + w.releaseOnce.Do(func() { close(w.release) }) +} diff --git a/internal/parquet/schema.go b/internal/parquet/schema.go new file mode 100644 index 0000000..2ede444 --- /dev/null +++ b/internal/parquet/schema.go @@ -0,0 +1,81 @@ +package parquet + +import ( + "strconv" + + "ior/internal/flags" + "ior/internal/streamrow" + + parquetgo "github.com/parquet-go/parquet-go" +) + +// Record is the persisted Parquet schema for one syscall stream row. +type Record struct { + Seq uint64 `parquet:"seq"` + TimeNS uint64 `parquet:"time_ns"` + GapNS uint64 `parquet:"gap_ns"` + LatencyNS uint64 `parquet:"latency_ns"` + Comm string `parquet:"comm"` + PID uint32 `parquet:"pid"` + TID uint32 `parquet:"tid"` + Syscall string `parquet:"syscall"` + FD int32 `parquet:"fd"` + Ret int64 `parquet:"ret"` + Bytes uint64 `parquet:"bytes"` + File string `parquet:"file"` + IsError bool `parquet:"is_error"` + FilterEpoch uint64 `parquet:"filter_epoch"` +} + +// FileMetadata captures constant metadata written once into the parquet file. +type FileMetadata struct { + Hostname string + StartedAtUnixNano uint64 + Mode string + IORVersion string +} + +// RecordFromStream converts one shared stream row into the persisted format. +func RecordFromStream(row streamrow.Row, filterEpoch uint64) Record { + return Record{ + Seq: row.Seq, + TimeNS: row.TimeNs, + GapNS: row.GapNs, + LatencyNS: row.DurationNs, + Comm: row.Comm, + PID: row.PID, + TID: row.TID, + Syscall: row.Syscall, + FD: row.FD, + Ret: row.RetVal, + Bytes: row.Bytes, + File: row.FileName, + IsError: row.IsError, + FilterEpoch: filterEpoch, + } +} + +func writerMetadataOptions(meta FileMetadata) []parquetgo.WriterOption { + meta = normalizeMetadata(meta) + options := make([]parquetgo.WriterOption, 0, 4) + if meta.Hostname != "" { + options = append(options, parquetgo.KeyValueMetadata("ior.hostname", meta.Hostname)) + } + if meta.StartedAtUnixNano != 0 { + options = append(options, parquetgo.KeyValueMetadata("ior.started_at_unix_nano", strconv.FormatUint(meta.StartedAtUnixNano, 10))) + } + if meta.Mode != "" { + options = append(options, parquetgo.KeyValueMetadata("ior.mode", meta.Mode)) + } + if meta.IORVersion != "" { + options = append(options, parquetgo.KeyValueMetadata("ior.version", meta.IORVersion)) + } + return options +} + +func normalizeMetadata(meta FileMetadata) FileMetadata { + if meta.IORVersion == "" { + meta.IORVersion = flags.Version + } + return meta +} diff --git a/internal/parquet/writer.go b/internal/parquet/writer.go new file mode 100644 index 0000000..82428d9 --- /dev/null +++ b/internal/parquet/writer.go @@ -0,0 +1,209 @@ +package parquet + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + + parquetgo "github.com/parquet-go/parquet-go" +) + +const ( + defaultMaxRowsPerRowGroup = int64(8192) + defaultPageBufferSize = 256 * 1024 +) + +var errWriterClosed = errors.New("parquet writer is closed") + +// WriterConfig tunes parquet file layout details. +type WriterConfig struct { + MaxRowsPerRowGroup int64 + PageBufferSize int +} + +// DefaultWriterConfig returns the recommended writer defaults for ior traces. +func DefaultWriterConfig() WriterConfig { + return WriterConfig{ + MaxRowsPerRowGroup: defaultMaxRowsPerRowGroup, + PageBufferSize: defaultPageBufferSize, + } +} + +type writerState int + +const ( + writerStateOpen writerState = iota + writerStateClosed + writerStateAborted +) + +// Writer wraps the parquet library behind repo-local file lifecycle semantics. +type Writer struct { + mu sync.Mutex + + finalPath string + tempPath string + file *os.File + writer *parquetgo.GenericWriter[Record] + state writerState +} + +// NewWriter creates a new parquet writer that writes to a temporary file first +// and only publishes the final path once Close succeeds. +func NewWriter(path string, cfg WriterConfig, meta FileMetadata) (*Writer, error) { + finalPath, tempPath, err := normalizeOutputPaths(path) + if err != nil { + return nil, err + } + + cfg = normalizeWriterConfig(cfg) + file, err := os.Create(tempPath) + if err != nil { + return nil, err + } + + options := []parquetgo.WriterOption{ + parquetgo.Compression(&parquetgo.Zstd), + parquetgo.CreatedBy("ior", normalizeMetadata(meta).IORVersion, ""), + parquetgo.MaxRowsPerRowGroup(cfg.MaxRowsPerRowGroup), + parquetgo.PageBufferSize(cfg.PageBufferSize), + } + options = append(options, writerMetadataOptions(meta)...) + + return &Writer{ + finalPath: finalPath, + tempPath: tempPath, + file: file, + writer: parquetgo.NewGenericWriter[Record](file, options...), + state: writerStateOpen, + }, nil +} + +// FinalPath returns the finalized parquet path. +func (w *Writer) FinalPath() string { + if w == nil { + return "" + } + return w.finalPath +} + +// TempPath returns the temporary parquet path used before finalization. +func (w *Writer) TempPath() string { + if w == nil { + return "" + } + return w.tempPath +} + +// WriteRows appends a batch of records to the parquet file. +func (w *Writer) WriteRows(rows []Record) error { + if len(rows) == 0 { + return nil + } + + w.mu.Lock() + defer w.mu.Unlock() + + if w.state != writerStateOpen { + return errWriterClosed + } + + written, err := w.writer.Write(rows) + if err != nil { + return fmt.Errorf("write parquet rows: %w", err) + } + if written != len(rows) { + return fmt.Errorf("write parquet rows: wrote %d of %d rows", written, len(rows)) + } + return nil +} + +// Close finalizes the parquet footer and publishes the file atomically. +func (w *Writer) Close() error { + if w == nil { + return nil + } + + w.mu.Lock() + if w.state != writerStateOpen { + w.mu.Unlock() + return nil + } + file := w.file + writer := w.writer + tempPath := w.tempPath + finalPath := w.finalPath + w.state = writerStateClosed + w.mu.Unlock() + + if err := writer.Close(); err != nil { + closeErr := file.Close() + removeErr := os.Remove(tempPath) + return errors.Join(fmt.Errorf("close parquet writer: %w", err), closeErr, removeErr) + } + if err := file.Close(); err != nil { + removeErr := os.Remove(tempPath) + return errors.Join(fmt.Errorf("close parquet file: %w", err), removeErr) + } + if err := os.Rename(tempPath, finalPath); err != nil { + return fmt.Errorf("rename parquet file %q to %q: %w", tempPath, finalPath, err) + } + return nil +} + +// Abort discards the temporary parquet file. +func (w *Writer) Abort() error { + if w == nil { + return nil + } + + w.mu.Lock() + if w.state != writerStateOpen { + w.mu.Unlock() + return nil + } + file := w.file + tempPath := w.tempPath + w.state = writerStateAborted + w.mu.Unlock() + + closeErr := file.Close() + removeErr := os.Remove(tempPath) + if errors.Is(removeErr, os.ErrNotExist) { + removeErr = nil + } + return errors.Join(closeErr, removeErr) +} + +func normalizeWriterConfig(cfg WriterConfig) WriterConfig { + defaults := DefaultWriterConfig() + if cfg.MaxRowsPerRowGroup <= 0 { + cfg.MaxRowsPerRowGroup = defaults.MaxRowsPerRowGroup + } + if cfg.PageBufferSize <= 0 { + cfg.PageBufferSize = defaults.PageBufferSize + } + return cfg +} + +func normalizeOutputPaths(path string) (string, string, error) { + clean := filepath.Clean(strings.TrimSpace(path)) + if clean == "." || clean == "" { + return "", "", errors.New("parquet output path cannot be empty") + } + + lower := strings.ToLower(clean) + switch { + case strings.HasSuffix(lower, ".parquet.tmp"): + finalPath := strings.TrimSuffix(clean, ".tmp") + return finalPath, clean, nil + case strings.HasSuffix(lower, ".parquet"): + return clean, clean + ".tmp", nil + default: + finalPath := clean + ".parquet" + return finalPath, finalPath + ".tmp", nil + } +} diff --git a/internal/parquet/writer_test.go b/internal/parquet/writer_test.go new file mode 100644 index 0000000..0bd7d97 --- /dev/null +++ b/internal/parquet/writer_test.go @@ -0,0 +1,113 @@ +package parquet + +import ( + "io" + "os" + "path/filepath" + "reflect" + "testing" + + parquetgo "github.com/parquet-go/parquet-go" +) + +func TestWriterRoundTripAndFinalize(t *testing.T) { + dir := t.TempDir() + writer, err := NewWriter(filepath.Join(dir, "trace"), WriterConfig{}, FileMetadata{ + Hostname: "test-host", + StartedAtUnixNano: 1234, + Mode: "tui", + }) + if err != nil { + t.Fatalf("NewWriter() error = %v", err) + } + + rows := []Record{ + { + Seq: 1, + TimeNS: 10, + GapNS: 2, + LatencyNS: 5, + Comm: "cat", + PID: 11, + TID: 12, + Syscall: "read", + FD: 3, + Ret: 42, + Bytes: 42, + File: "/tmp/input", + IsError: false, + FilterEpoch: 7, + }, + { + Seq: 2, + TimeNS: 20, + GapNS: 3, + LatencyNS: 6, + Comm: "cp", + PID: 21, + TID: 22, + Syscall: "write", + FD: 4, + Ret: -1, + Bytes: 99, + File: "/tmp/output", + IsError: true, + FilterEpoch: 8, + }, + } + + if err := writer.WriteRows(rows); err != nil { + t.Fatalf("WriteRows() error = %v", err) + } + if _, err := os.Stat(writer.TempPath()); err != nil { + t.Fatalf("Stat(%q) error = %v, want temp file present", writer.TempPath(), err) + } + if _, err := os.Stat(writer.FinalPath()); !os.IsNotExist(err) { + t.Fatalf("Stat(%q) error = %v, want not-exist before Close", writer.FinalPath(), err) + } + + if err := writer.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + + if _, err := os.Stat(writer.TempPath()); !os.IsNotExist(err) { + t.Fatalf("Stat(%q) error = %v, want temp removed after Close", writer.TempPath(), err) + } + if _, err := os.Stat(writer.FinalPath()); err != nil { + t.Fatalf("Stat(%q) error = %v, want finalized parquet file", writer.FinalPath(), err) + } + + got := readAllRecords(t, writer.FinalPath()) + if !reflect.DeepEqual(got, rows) { + t.Fatalf("records mismatch\n got: %+v\nwant: %+v", got, rows) + } +} + +func readAllRecords(t *testing.T, path string) []Record { + t.Helper() + + f, err := os.Open(path) + if err != nil { + t.Fatalf("Open(%q) error = %v", path, err) + } + defer f.Close() + + reader := parquetgo.NewGenericReader[Record](f) + defer reader.Close() + + var rows []Record + buf := make([]Record, 4) + for { + n, err := reader.Read(buf) + if n > 0 { + rows = append(rows, buf[:n]...) + } + if err == nil { + continue + } + if err == io.EOF { + return rows + } + t.Fatalf("Read() error = %v", err) + } +} |
