summaryrefslogtreecommitdiff
path: root/Magefile.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-18 20:53:50 +0200
committerPaul Buetow <paul@buetow.org>2026-03-18 20:53:50 +0200
commitb421b2232351049277ee4ad5b31367bb2b6779bb (patch)
tree6e185cbf1736bb0308231563b29eeab10160d682 /Magefile.go
parentc234be24815452fa1143c9a523f4615e0d745fb7 (diff)
feat: add ParquetValidate mage target and ClickHouse querying docs
Adds a `mage parquetValidate` target that validates a Parquet recording via clickhouse-local in Docker (schema presence, row count, seq/time_ns sanity). Adds docs/parquet-querying.md with schema reference, invocation pattern, and seven example queries with representative outputs. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Diffstat (limited to 'Magefile.go')
-rw-r--r--Magefile.go146
1 files changed, 146 insertions, 0 deletions
diff --git a/Magefile.go b/Magefile.go
index ecb4060..95e4c0c 100644
--- a/Magefile.go
+++ b/Magefile.go
@@ -26,6 +26,7 @@ import (
)
const (
+ clickhouseImage = "clickhouse/clickhouse-server:latest"
binaryName = "ior"
workloadBinaryName = "ioworkload"
defaultLibbpfgoPath = "../libbpfgo"
@@ -999,3 +1000,148 @@ func isIntegrationTest(testName string) (bool, error) {
}
return false, nil
}
+
+// ParquetValidate validates a Parquet file using clickhouse-local in Docker.
+// Set PARQUET_FILE to override the default (latest *.parquet in repo root).
+// Checks schema column presence, row count > 0, and basic sanity on seq/time_ns.
+func ParquetValidate() error {
+ path, err := resolveParquetFile()
+ if err != nil {
+ return err
+ }
+ if err := checkDockerAvailable(); err != nil {
+ return err
+ }
+ abs, err := filepath.Abs(path)
+ if err != nil {
+ return fmt.Errorf("resolve absolute path for %s: %w", path, err)
+ }
+ dir, file := filepath.Dir(abs), filepath.Base(abs)
+ fmt.Printf("Validating parquet file: %s\n", abs)
+ return runParquetChecks(dir, file)
+}
+
+// resolveParquetFile returns the parquet file path from PARQUET_FILE env or
+// globs for the latest *.parquet in the repo root.
+func resolveParquetFile() (string, error) {
+ if path := os.Getenv("PARQUET_FILE"); path != "" {
+ if _, err := os.Stat(path); err != nil {
+ return "", fmt.Errorf("PARQUET_FILE=%s: %w", path, err)
+ }
+ return path, nil
+ }
+ matches, err := filepath.Glob("*.parquet")
+ if err != nil {
+ return "", fmt.Errorf("glob *.parquet: %w", err)
+ }
+ if len(matches) == 0 {
+ return "", fmt.Errorf("no *.parquet files found in repo root; set PARQUET_FILE to specify one")
+ }
+ // Use the last match (lexicographically latest, which matches timestamp-named files).
+ return matches[len(matches)-1], nil
+}
+
+// checkDockerAvailable verifies that Docker is reachable via `docker info`.
+func checkDockerAvailable() error {
+ cmd := exec.Command("docker", "info")
+ cmd.Stdout = io.Discard
+ cmd.Stderr = io.Discard
+ if err := cmd.Run(); err != nil {
+ return fmt.Errorf("docker is not available (is the daemon running?): %w", err)
+ }
+ return nil
+}
+
+// runClickHouseQuery runs a SQL query against a parquet file using clickhouse-local
+// in Docker. The file must be in dir; it is mounted read-only at /data inside the
+// container. Returns the trimmed stdout output.
+func runClickHouseQuery(dir, file, sql string) (string, error) {
+ // Mount dir as /data read-only; pass the SQL via -q so no shell quoting is needed.
+ out, err := sh.Output("docker", "run", "--rm",
+ "-v", dir+":/data:ro",
+ clickhouseImage,
+ "clickhouse", "local", "-q", sql,
+ )
+ if err != nil {
+ return "", fmt.Errorf("clickhouse query %q: %w", sql, err)
+ }
+ return strings.TrimSpace(out), nil
+}
+
+// expectedParquetColumns lists the 14 column names that the parquet schema must contain.
+var expectedParquetColumns = []string{
+ "seq", "time_ns", "gap_ns", "latency_ns", "comm",
+ "pid", "tid", "syscall", "fd", "ret",
+ "bytes", "file", "is_error", "filter_epoch",
+}
+
+// runParquetChecks runs schema, row-count, and sanity checks against the parquet file.
+// dir is the absolute directory containing file (mounted at /data in the container).
+func runParquetChecks(dir, file string) error {
+ dataFile := "/data/" + file
+
+ // --- Schema check ---
+ fmt.Println("--- Schema check ---")
+ schemaOut, err := runClickHouseQuery(dir, file,
+ fmt.Sprintf("DESCRIBE TABLE file('%s', Parquet)", dataFile))
+ if err != nil {
+ return err
+ }
+ fmt.Println(schemaOut)
+ // Verify all expected column names appear somewhere in the DESCRIBE output.
+ missing := []string{}
+ for _, col := range expectedParquetColumns {
+ if !strings.Contains(schemaOut, col) {
+ missing = append(missing, col)
+ }
+ }
+ if len(missing) > 0 {
+ return fmt.Errorf("schema check FAIL: missing columns: %s", strings.Join(missing, ", "))
+ }
+ fmt.Printf("Schema check PASS: all %d expected columns present\n\n", len(expectedParquetColumns))
+
+ // --- Row count check ---
+ fmt.Println("--- Row count check ---")
+ countOut, err := runClickHouseQuery(dir, file,
+ fmt.Sprintf("SELECT count(*) FROM file('%s', Parquet)", dataFile))
+ if err != nil {
+ return err
+ }
+ rowCount, err := strconv.ParseInt(strings.TrimSpace(countOut), 10, 64)
+ if err != nil {
+ return fmt.Errorf("parse row count %q: %w", countOut, err)
+ }
+ if rowCount <= 0 {
+ return fmt.Errorf("row count check FAIL: got %d rows, expected > 0", rowCount)
+ }
+ fmt.Printf("Row count check PASS: %d rows\n\n", rowCount)
+
+ // --- Sanity check: seq and time_ns ranges ---
+ fmt.Println("--- Sanity check ---")
+ sanityOut, err := runClickHouseQuery(dir, file,
+ fmt.Sprintf("SELECT min(seq), max(seq), min(time_ns), countIf(is_error) FROM file('%s', Parquet)", dataFile))
+ if err != nil {
+ return err
+ }
+ fmt.Println(sanityOut)
+ parts := strings.Split(sanityOut, "\t")
+ if len(parts) != 4 {
+ return fmt.Errorf("sanity check FAIL: unexpected output format (got %d tab-separated columns)", len(parts))
+ }
+ minSeq, err1 := strconv.ParseUint(strings.TrimSpace(parts[0]), 10, 64)
+ maxSeq, err2 := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64)
+ minTimeNs, err3 := strconv.ParseUint(strings.TrimSpace(parts[2]), 10, 64)
+ if err1 != nil || err2 != nil || err3 != nil {
+ return fmt.Errorf("sanity check FAIL: could not parse numeric values from: %s", sanityOut)
+ }
+ if maxSeq <= minSeq {
+ return fmt.Errorf("sanity check FAIL: seq not monotonically increasing (min=%d, max=%d)", minSeq, maxSeq)
+ }
+ if minTimeNs == 0 {
+ return fmt.Errorf("sanity check FAIL: min(time_ns) is zero")
+ }
+ fmt.Printf("Sanity check PASS: seq range [%d, %d], min time_ns=%d, error_count=%s\n",
+ minSeq, maxSeq, minTimeNs, strings.TrimSpace(parts[3]))
+
+ return nil
+}