diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-18 20:53:50 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-18 20:53:50 +0200 |
| commit | b421b2232351049277ee4ad5b31367bb2b6779bb (patch) | |
| tree | 6e185cbf1736bb0308231563b29eeab10160d682 /Magefile.go | |
| parent | c234be24815452fa1143c9a523f4615e0d745fb7 (diff) | |
feat: add ParquetValidate mage target and ClickHouse querying docs
Adds a `mage parquetValidate` target that validates a Parquet recording
via clickhouse-local in Docker (schema presence, row count, seq/time_ns
sanity). Adds docs/parquet-querying.md with schema reference, invocation
pattern, and seven example queries with representative outputs.
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Diffstat (limited to 'Magefile.go')
| -rw-r--r-- | Magefile.go | 146 |
1 files changed, 146 insertions, 0 deletions
diff --git a/Magefile.go b/Magefile.go index ecb4060..95e4c0c 100644 --- a/Magefile.go +++ b/Magefile.go @@ -26,6 +26,7 @@ import ( ) const ( + clickhouseImage = "clickhouse/clickhouse-server:latest" binaryName = "ior" workloadBinaryName = "ioworkload" defaultLibbpfgoPath = "../libbpfgo" @@ -999,3 +1000,148 @@ func isIntegrationTest(testName string) (bool, error) { } return false, nil } + +// ParquetValidate validates a Parquet file using clickhouse-local in Docker. +// Set PARQUET_FILE to override the default (latest *.parquet in repo root). +// Checks schema column presence, row count > 0, and basic sanity on seq/time_ns. +func ParquetValidate() error { + path, err := resolveParquetFile() + if err != nil { + return err + } + if err := checkDockerAvailable(); err != nil { + return err + } + abs, err := filepath.Abs(path) + if err != nil { + return fmt.Errorf("resolve absolute path for %s: %w", path, err) + } + dir, file := filepath.Dir(abs), filepath.Base(abs) + fmt.Printf("Validating parquet file: %s\n", abs) + return runParquetChecks(dir, file) +} + +// resolveParquetFile returns the parquet file path from PARQUET_FILE env or +// globs for the latest *.parquet in the repo root. +func resolveParquetFile() (string, error) { + if path := os.Getenv("PARQUET_FILE"); path != "" { + if _, err := os.Stat(path); err != nil { + return "", fmt.Errorf("PARQUET_FILE=%s: %w", path, err) + } + return path, nil + } + matches, err := filepath.Glob("*.parquet") + if err != nil { + return "", fmt.Errorf("glob *.parquet: %w", err) + } + if len(matches) == 0 { + return "", fmt.Errorf("no *.parquet files found in repo root; set PARQUET_FILE to specify one") + } + // Use the last match (lexicographically latest, which matches timestamp-named files). + return matches[len(matches)-1], nil +} + +// checkDockerAvailable verifies that Docker is reachable via `docker info`. +func checkDockerAvailable() error { + cmd := exec.Command("docker", "info") + cmd.Stdout = io.Discard + cmd.Stderr = io.Discard + if err := cmd.Run(); err != nil { + return fmt.Errorf("docker is not available (is the daemon running?): %w", err) + } + return nil +} + +// runClickHouseQuery runs a SQL query against a parquet file using clickhouse-local +// in Docker. The file must be in dir; it is mounted read-only at /data inside the +// container. Returns the trimmed stdout output. +func runClickHouseQuery(dir, file, sql string) (string, error) { + // Mount dir as /data read-only; pass the SQL via -q so no shell quoting is needed. + out, err := sh.Output("docker", "run", "--rm", + "-v", dir+":/data:ro", + clickhouseImage, + "clickhouse", "local", "-q", sql, + ) + if err != nil { + return "", fmt.Errorf("clickhouse query %q: %w", sql, err) + } + return strings.TrimSpace(out), nil +} + +// expectedParquetColumns lists the 14 column names that the parquet schema must contain. +var expectedParquetColumns = []string{ + "seq", "time_ns", "gap_ns", "latency_ns", "comm", + "pid", "tid", "syscall", "fd", "ret", + "bytes", "file", "is_error", "filter_epoch", +} + +// runParquetChecks runs schema, row-count, and sanity checks against the parquet file. +// dir is the absolute directory containing file (mounted at /data in the container). +func runParquetChecks(dir, file string) error { + dataFile := "/data/" + file + + // --- Schema check --- + fmt.Println("--- Schema check ---") + schemaOut, err := runClickHouseQuery(dir, file, + fmt.Sprintf("DESCRIBE TABLE file('%s', Parquet)", dataFile)) + if err != nil { + return err + } + fmt.Println(schemaOut) + // Verify all expected column names appear somewhere in the DESCRIBE output. + missing := []string{} + for _, col := range expectedParquetColumns { + if !strings.Contains(schemaOut, col) { + missing = append(missing, col) + } + } + if len(missing) > 0 { + return fmt.Errorf("schema check FAIL: missing columns: %s", strings.Join(missing, ", ")) + } + fmt.Printf("Schema check PASS: all %d expected columns present\n\n", len(expectedParquetColumns)) + + // --- Row count check --- + fmt.Println("--- Row count check ---") + countOut, err := runClickHouseQuery(dir, file, + fmt.Sprintf("SELECT count(*) FROM file('%s', Parquet)", dataFile)) + if err != nil { + return err + } + rowCount, err := strconv.ParseInt(strings.TrimSpace(countOut), 10, 64) + if err != nil { + return fmt.Errorf("parse row count %q: %w", countOut, err) + } + if rowCount <= 0 { + return fmt.Errorf("row count check FAIL: got %d rows, expected > 0", rowCount) + } + fmt.Printf("Row count check PASS: %d rows\n\n", rowCount) + + // --- Sanity check: seq and time_ns ranges --- + fmt.Println("--- Sanity check ---") + sanityOut, err := runClickHouseQuery(dir, file, + fmt.Sprintf("SELECT min(seq), max(seq), min(time_ns), countIf(is_error) FROM file('%s', Parquet)", dataFile)) + if err != nil { + return err + } + fmt.Println(sanityOut) + parts := strings.Split(sanityOut, "\t") + if len(parts) != 4 { + return fmt.Errorf("sanity check FAIL: unexpected output format (got %d tab-separated columns)", len(parts)) + } + minSeq, err1 := strconv.ParseUint(strings.TrimSpace(parts[0]), 10, 64) + maxSeq, err2 := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64) + minTimeNs, err3 := strconv.ParseUint(strings.TrimSpace(parts[2]), 10, 64) + if err1 != nil || err2 != nil || err3 != nil { + return fmt.Errorf("sanity check FAIL: could not parse numeric values from: %s", sanityOut) + } + if maxSeq <= minSeq { + return fmt.Errorf("sanity check FAIL: seq not monotonically increasing (min=%d, max=%d)", minSeq, maxSeq) + } + if minTimeNs == 0 { + return fmt.Errorf("sanity check FAIL: min(time_ns) is zero") + } + fmt.Printf("Sanity check PASS: seq range [%d, %d], min time_ns=%d, error_count=%s\n", + minSeq, maxSeq, minTimeNs, strings.TrimSpace(parts[3])) + + return nil +} |
