summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-02-07 16:32:10 +0200
committerPaul Buetow <paul@buetow.org>2026-02-07 16:32:10 +0200
commit3fd46f3977fb650974e5e936cba362c787c00637 (patch)
treeb49111ddd0b7af4a007bca6a304dba10efcd88ff
reimport this PoC
-rw-r--r--AGENT.md63
-rw-r--r--CLAUDE.md1
-rw-r--r--Magefile.go253
-rw-r--r--README.md1000
-rwxr-xr-xbackfill-historic-data.sh60
-rwxr-xr-xbenchmark-100mb.sh223
-rwxr-xr-xbenchmark-1gb.sh223
-rwxr-xr-xcleanup-benchmark-data.sh88
-rwxr-xr-xcleanup-benchmark-metrics.sh83
-rwxr-xr-xgenerate-test-data.sh46
-rw-r--r--go.mod26
-rw-r--r--go.sum83
-rw-r--r--internal/config/config.go57
-rw-r--r--internal/config/config_test.go52
-rw-r--r--internal/ingester/auto.go145
-rw-r--r--internal/ingester/auto_test.go164
-rw-r--r--internal/ingester/clickhouse.go191
-rw-r--r--internal/ingester/pushgateway.go51
-rw-r--r--internal/ingester/pushgateway_test.go28
-rw-r--r--internal/ingester/remotewrite.go455
-rw-r--r--internal/ingester/remotewrite_test.go210
-rw-r--r--internal/metrics/generator.go85
-rw-r--r--internal/metrics/generator_test.go53
-rw-r--r--internal/metrics/sample.go34
-rw-r--r--internal/metrics/sample_test.go160
-rw-r--r--internal/parser/csv.go101
-rw-r--r--internal/parser/csv_test.go175
-rw-r--r--internal/parser/json.go62
-rw-r--r--internal/parser/json_test.go177
-rw-r--r--internal/parser/parser.go56
-rw-r--r--internal/parser/parser_test.go99
-rw-r--r--internal/parser/tabular_csv.go256
-rw-r--r--internal/parser/tabular_csv_test.go469
-rw-r--r--internal/resolver/dns_resolver.go274
-rw-r--r--internal/resolver/dns_resolver_test.go232
-rw-r--r--internal/version/version.go4
-rw-r--r--internal/watcher/file_watcher.go86
-rw-r--r--logo.pngbin0 -> 145949 bytes
-rwxr-xr-xrun.sh31
-rw-r--r--test-data/watch-clickhouse-test.csv12
-rwxr-xr-xverify-clickhouse.sh52
41 files changed, 5920 insertions, 0 deletions
diff --git a/AGENT.md b/AGENT.md
new file mode 100644
index 0000000..d5c22e7
--- /dev/null
+++ b/AGENT.md
@@ -0,0 +1,63 @@
+Follow ~/git/conf/snippets/go/go-projects.md
+
+## Grafana Dashboard Guidelines
+
+When creating or updating Grafana dashboards:
+
+### Sorting Requirements
+**ALWAYS ensure ALL panels are sorted by value (descending):**
+
+1. **Time Series Panels:**
+ - Add to `options.legend`: `"sortBy": "Last", "sortDesc": true`
+
+2. **Bar Gauge Panels:**
+ - Use `sort_desc()` in PromQL queries
+ - Example: `sort_desc(topk(10, sum by (label) (metric)))`
+
+3. **Pie/Donut Chart Panels:**
+ - Add to `options.legend`: `"sortBy": "Value", "sortDesc": true`
+
+4. **Table Panels:**
+ - Add to `options`: `"sortBy": [{"displayName": "ColumnName", "desc": true}]`
+
+### Example Panel Configuration
+
+**Time Series:**
+```json
+{
+ "type": "timeseries",
+ "options": {
+ "legend": {
+ "displayMode": "table",
+ "placement": "right",
+ "sortBy": "Last",
+ "sortDesc": true,
+ "calcs": ["lastNotNull", "mean", "max"]
+ }
+ }
+}
+```
+
+**Bar Gauge:**
+```json
+{
+ "type": "bargauge",
+ "targets": [{
+ "expr": "sort_desc(topk(15, sum by (cust) (metric)))"
+ }],
+ "options": {
+ "orientation": "horizontal",
+ "displayMode": "gradient"
+ }
+}
+```
+
+**Table:**
+```json
+{
+ "type": "table",
+ "options": {
+ "sortBy": [{"displayName": "Count", "desc": true}]
+ }
+}
+```
diff --git a/CLAUDE.md b/CLAUDE.md
new file mode 100644
index 0000000..02d954c
--- /dev/null
+++ b/CLAUDE.md
@@ -0,0 +1 @@
+Follow AGENT.md
diff --git a/Magefile.go b/Magefile.go
new file mode 100644
index 0000000..3cce6e0
--- /dev/null
+++ b/Magefile.go
@@ -0,0 +1,253 @@
+//go:build mage
+// +build mage
+
+package main
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/magefile/mage/mg"
+ "github.com/magefile/mage/sh"
+)
+
+const (
+ binaryName = "epimetheus"
+ mainPath = "./cmd/epimetheus"
+)
+
+// Default target to run when none is specified
+var Default = Build
+
+// Build compiles the epimetheus binary
+func Build() error {
+ fmt.Println("Building epimetheus...")
+ return sh.RunV("go", "build", "-o", binaryName, mainPath)
+}
+
+// Install installs the binary to $GOPATH/bin
+func Install() error {
+ fmt.Println("Installing epimetheus...")
+ return sh.RunV("go", "install", mainPath)
+}
+
+// Run executes the epimetheus binary in realtime mode
+func Run() error {
+ mg.Deps(Build)
+ fmt.Println("Running epimetheus in realtime mode...")
+ return sh.RunV("./"+binaryName, "-mode=realtime", "-continuous")
+}
+
+// RunHistoric runs epimetheus in historic mode
+func RunHistoric() error {
+ mg.Deps(Build)
+ fmt.Println("Running epimetheus in historic mode (24 hours ago)...")
+ return sh.RunV("./"+binaryName, "-mode=historic", "-hours-ago=24")
+}
+
+// RunAuto runs epimetheus in auto mode with a file
+func RunAuto(file string) error {
+ mg.Deps(Build)
+ if file == "" {
+ return fmt.Errorf("file parameter required: mage RunAuto <file>")
+ }
+ fmt.Printf("Running epimetheus in auto mode with file: %s\n", file)
+ return sh.RunV("./"+binaryName, "-mode=auto", "-file="+file)
+}
+
+// RunWatchClickHouse runs epimetheus in watch mode with ClickHouse ingestion
+func RunWatchClickHouse(file string) error {
+ mg.Deps(Build)
+ if file == "" {
+ file = "test-data/watch-clickhouse-test.csv"
+ }
+ fmt.Printf("Running epimetheus in watch mode with ClickHouse (file: %s)\n", file)
+ return sh.RunV("./"+binaryName, "-mode=watch", "-file="+file, "-metric-name=watch_test",
+ "-clickhouse=http://localhost:8123", "-prometheus=")
+}
+
+// Test runs all tests
+func Test() error {
+ fmt.Println("Running tests...")
+ return sh.RunV("go", "test", "./...", "-v")
+}
+
+// TestCoverage runs tests with coverage report
+func TestCoverage() error {
+ fmt.Println("Running tests with coverage...")
+ if err := sh.RunV("go", "test", "./...", "-cover", "-coverprofile=coverage.out"); err != nil {
+ return err
+ }
+ return sh.RunV("go", "tool", "cover", "-html=coverage.out", "-o", "coverage.html")
+}
+
+// TestRace runs tests with race detector
+func TestRace() error {
+ fmt.Println("Running tests with race detector...")
+ return sh.RunV("go", "test", "./...", "-race", "-v")
+}
+
+// Benchmark runs all benchmarks
+func Benchmark() error {
+ fmt.Println("Running benchmarks...")
+ return sh.RunV("go", "test", "./...", "-bench=.", "-benchmem")
+}
+
+// Lint runs golangci-lint
+func Lint() error {
+ fmt.Println("Running linter...")
+ return sh.RunV("golangci-lint", "run", "./...")
+}
+
+// Fmt formats all Go code
+func Fmt() error {
+ fmt.Println("Formatting code...")
+ return sh.RunV("go", "fmt", "./...")
+}
+
+// Vet runs go vet
+func Vet() error {
+ fmt.Println("Running go vet...")
+ return sh.RunV("go", "vet", "./...")
+}
+
+// Tidy runs go mod tidy
+func Tidy() error {
+ fmt.Println("Tidying dependencies...")
+ return sh.RunV("go", "mod", "tidy")
+}
+
+// Clean removes build artifacts
+func Clean() error {
+ fmt.Println("Cleaning build artifacts...")
+ files := []string{
+ binaryName,
+ "coverage.out",
+ "coverage.html",
+ }
+ for _, f := range files {
+ if err := sh.Rm(f); err != nil && !os.IsNotExist(err) {
+ return err
+ }
+ }
+ return nil
+}
+
+// Generate runs go generate
+func Generate() error {
+ fmt.Println("Running go generate...")
+ return sh.RunV("go", "generate", "./...")
+}
+
+// Version prints the version
+func Version() error {
+ fmt.Println("Printing version...")
+ mg.Deps(Build)
+ return sh.RunV("./"+binaryName, "-version")
+}
+
+// All runs format, vet, test, and build
+func All() {
+ mg.Deps(Fmt, Vet, Test, Build)
+}
+
+// CI runs the full CI pipeline (format check, vet, test, build)
+func CI() error {
+ fmt.Println("Running CI pipeline...")
+ mg.Deps(Tidy, Vet, Test)
+ return Build()
+}
+
+// Dev starts development mode with port-forwarding
+func Dev() error {
+ mg.Deps(Build)
+ fmt.Println("Starting development mode...")
+ fmt.Println("Setting up port-forward to Pushgateway...")
+
+ // Start port-forward in background
+ portForwardCmd := sh.RunCmd("kubectl", "port-forward", "-n", "monitoring", "svc/pushgateway", "9091:9091")
+ go func() {
+ if err := portForwardCmd(); err != nil {
+ fmt.Printf("Port-forward error: %v\n", err)
+ }
+ }()
+
+ fmt.Println("Running epimetheus in realtime mode...")
+ return sh.RunV("./"+binaryName, "-mode=realtime", "-continuous")
+}
+
+// GenerateTestData creates test data files
+func GenerateTestData() error {
+ fmt.Println("Generating test data...")
+ return sh.RunV("./generate-test-data.sh")
+}
+
+// Backfill runs backfill for the last 48 hours
+func Backfill() error {
+ mg.Deps(Build)
+ fmt.Println("Running backfill (last 48 hours)...")
+ return sh.RunV("./"+binaryName, "-mode=backfill", "-start-hours=48", "-end-hours=0", "-interval=1")
+}
+
+// Benchmark100MB runs the 100MB benchmark
+func Benchmark100MB() error {
+ fmt.Println("Running 100MB benchmark...")
+ return sh.RunV("./benchmark-100mb.sh")
+}
+
+// Benchmark1GB runs the 1GB benchmark
+func Benchmark1GB() error {
+ fmt.Println("Running 1GB benchmark...")
+ return sh.RunV("./benchmark-1gb.sh")
+}
+
+// CleanupBenchmarkData removes benchmark data from Prometheus
+func CleanupBenchmarkData() error {
+ fmt.Println("Cleaning up benchmark data...")
+ return sh.RunV("./cleanup-benchmark-data.sh")
+}
+
+// CleanupBenchmarkMetrics removes benchmark metric files
+func CleanupBenchmarkMetrics() error {
+ fmt.Println("Cleaning up benchmark metric files...")
+ return sh.RunV("./cleanup-benchmark-metrics.sh")
+}
+
+// DeployDashboard deploys the Grafana dashboard
+func DeployDashboard() error {
+ fmt.Println("Deploying Grafana dashboard...")
+ return sh.RunV("./deploy-dashboard.sh")
+}
+
+// Help prints available targets
+func Help() {
+ fmt.Println("Available targets:")
+ fmt.Println(" build - Build the epimetheus binary (default)")
+ fmt.Println(" install - Install the binary to $GOPATH/bin")
+ fmt.Println(" run - Build and run in realtime mode")
+ fmt.Println(" runHistoric - Build and run in historic mode")
+ fmt.Println(" runAuto <file> - Build and run in auto mode with file")
+ fmt.Println(" runWatchClickHouse [file] - Build and run watch mode with ClickHouse (default: test-data/watch-clickhouse-test.csv)")
+ fmt.Println(" test - Run all tests")
+ fmt.Println(" testCoverage - Run tests with coverage report")
+ fmt.Println(" testRace - Run tests with race detector")
+ fmt.Println(" benchmark - Run Go benchmarks")
+ fmt.Println(" lint - Run golangci-lint")
+ fmt.Println(" fmt - Format all Go code")
+ fmt.Println(" vet - Run go vet")
+ fmt.Println(" tidy - Run go mod tidy")
+ fmt.Println(" clean - Remove build artifacts")
+ fmt.Println(" generate - Run go generate")
+ fmt.Println(" version - Print version")
+ fmt.Println(" all - Run fmt, vet, test, and build")
+ fmt.Println(" ci - Run full CI pipeline")
+ fmt.Println(" dev - Start development mode with port-forwarding")
+ fmt.Println(" generateTestData - Generate test data files")
+ fmt.Println(" backfill - Run backfill for last 48 hours")
+ fmt.Println(" benchmark100MB - Run 100MB benchmark")
+ fmt.Println(" benchmark1GB - Run 1GB benchmark")
+ fmt.Println(" cleanupBenchmarkData - Clean up benchmark data from Prometheus")
+ fmt.Println(" cleanupBenchmarkMetrics - Clean up benchmark metric files")
+ fmt.Println(" deployDashboard - Deploy Grafana dashboard")
+ fmt.Println(" help - Print this help message")
+}
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..ba10a76
--- /dev/null
+++ b/README.md
@@ -0,0 +1,1000 @@
+<div align="center">
+ <img src="logo.png" alt="Epimetheus Logo" width="400"/>
+</div>
+
+# Epimetheus
+
+A versatile Go tool for pushing metrics to Prometheus with support for both realtime and historic data ingestion.
+
+## Why "Epimetheus"?
+
+In Greek mythology, [Epimetheus](https://en.wikipedia.org/wiki/Epimetheus_(mythology)) is Prometheus's brother, whose name means "afterthought" or "hindsight" (while Prometheus means "forethought"). This name cleverly captures the tool's purpose: bringing data to Prometheus **after** collection, whether it's historic data from hours, days, or weeks ago, or realtime data pushed on-demand.
+
+While Epimetheus is sometimes depicted as foolish in myths (he accepted Pandora's box despite warnings), this tool embraces the "afterthought" aspect productively - it's never too late to bring your metrics home to Prometheus!
+
+## Architecture
+
+```
+┌─────────────────────────────────────────────────────────────────────────┐
+│ Epimetheus │
+│ (Metrics Ingestion Tool) │
+│ │
+│ Modes: │
+│ • Realtime - Current metrics (< 5 min old) │
+│ • Historic - Historic metrics (≥ 5 min old) │
+│ • Backfill - Range of historic data │
+│ • Auto - Automatic routing based on timestamp age │
+└─────────────────────────────────────────────────────────────────────────┘
+ │ │
+ │ Realtime Data │ Historic Data
+ │ (via HTTP POST) │ (via Remote Write API)
+ │ Uses "now" timestamp │ Preserves timestamps
+ ▼ ▼
+┌─────────────────────┐ ┌─────────────────────┐
+│ Pushgateway │ │ Prometheus │
+│ (Port 9091) │ │ (Port 9090) │
+│ │ │ │
+│ • Buffers metrics │ │ Remote Write API: │
+│ • Scraped by │──── Scraped ─────▶ │ /api/v1/write │
+│ Prometheus │ every 15-30s │ │
+│ • No timestamp │ │ Feature Required: │
+│ preservation │ │ --enable-feature= │
+│ │ │ remote-write- │
+│ │ │ receiver │
+└─────────────────────┘ └─────────────────────┘
+ │
+ │ Prometheus Query API
+ │ /api/v1/query
+ ▼
+ ┌─────────────────────┐
+ │ Grafana │
+ │ (Port 3000) │
+ │ │
+ │ • Prometheus as │
+ │ datasource │
+ │ • Dashboards: │
+ │ - Epimetheus │
+ │ Test Metrics │
+ │ • Auto-refresh │
+ └─────────────────────┘
+```
+
+### Data Flow
+
+1. **Realtime Path** (for current data):
+ - Epimetheus → Pushgateway (HTTP POST)
+ - Prometheus scrapes Pushgateway periodically
+ - Timestamp = "now" when Prometheus scrapes
+
+2. **Historic Path** (for old data):
+ - Epimetheus → Prometheus Remote Write API (HTTP POST)
+ - Direct write to Prometheus TSDB
+ - Timestamp preserved from original data
+
+3. **Visualization**:
+ - Grafana queries Prometheus
+ - Displays metrics in dashboards
+ - Auto-refresh every 10 seconds
+
+## Overview
+
+**epimetheus** is a standalone binary that:
+- **Generates** realistic example metrics simulating production applications
+- **Pushes** metrics via Pushgateway (realtime) or Remote Write API (historic)
+- **Automatically detects** timestamp age and chooses the optimal ingestion method
+- **Supports** multiple data formats (CSV, JSON) and all Prometheus metric types
+- **Provides** Grafana dashboard for visualizing test metrics
+
+## Quick Start
+
+### 1. Deploy Pushgateway (one-time setup)
+
+The Pushgateway Helm chart is available in the [conf repository](https://codeberg.org/snonux/conf) at `f3s/pushgateway/helm-chart`.
+
+```bash
+# Clone the conf repository if you haven't already
+git clone https://codeberg.org/snonux/conf.git
+cd conf/f3s/pushgateway/helm-chart
+
+# Deploy Pushgateway
+helm upgrade --install pushgateway . -n monitoring --create-namespace
+```
+
+Alternatively, deploy Pushgateway using the official chart:
+
+```bash
+helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
+helm install pushgateway prometheus-community/prometheus-pushgateway -n monitoring --create-namespace
+```
+
+### 2. Run in Realtime Mode
+
+```bash
+# Port-forward Pushgateway
+kubectl port-forward -n monitoring svc/pushgateway 9091:9091 &
+
+# Push test metrics continuously
+cd /home/paul/git/conf/f3s/epimetheus
+./epimetheus -mode=realtime -continuous
+```
+
+The binary pushes metrics every 15 seconds. Press Ctrl+C to stop.
+
+### 3. View Metrics
+
+```bash
+# Pushgateway UI
+open http://localhost:9091
+
+# Prometheus UI
+kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 &
+open http://localhost:9090
+```
+
+## Operating Modes
+
+### 👁️ Watch Mode
+Monitor CSV files for changes and push metrics to Prometheus with file modification timestamps.
+
+**Works with ANY CSV format** - automatically detects numeric vs string columns and sanitizes names.
+
+**NEW: Automatic DNS Resolution** - IP addresses are automatically resolved to hostnames for better observability in Grafana.
+
+```bash
+./epimetheus -mode=watch \
+ -file=mydata.csv \
+ -metric-name=myapp \
+ -prometheus=http://localhost:9090/api/v1/write
+```
+
+**Features:**
+- 🔍 **Format-agnostic**: Works with any tabular CSV structure
+- 📊 **Automatic detection**: Numeric columns → metrics, String columns → labels
+- 🏷️ **Name sanitization**: `min(potatoes)`, `avg(time)`, `p99(latency)` → valid metric names
+- 🌐 **DNS Resolution**: IP addresses → hostnames (e.g., `10.50.52.61` → `foo.example.lan`)
+- 💾 **Smart Caching**: In-memory cache prevents redundant DNS lookups
+- ⏱️ **Timestamp preservation**: Uses file modification time
+- 🔄 **Continuous monitoring**: Polls file every 1 second
+- 💪 **Error resilient**: Continues watching despite failures
+- 🎯 **Remote Write**: Pushes to Prometheus (preserves timestamps)
+
+**CSV Format:**
+Works with any tabular CSV:
+- First row: column headers (automatically sanitized)
+- Subsequent rows: data values
+- Column names can be anything: `min(x)`, `avg(y)`, `p99(latency)`, etc.
+
+**Example 1** - Web metrics:
+```csv
+avg(response_time),p99(latency),endpoint,method
+45.2,120.5,/api/users,GET
+52.1,135.8,/api/orders,POST
+```
+
+Generates:
+```promql
+web_avg_response_time{endpoint="/api/users",method="GET"} 45.2
+web_p99_latency{endpoint="/api/users",method="GET"} 120.5
+web_avg_response_time{endpoint="/api/orders",method="POST"} 52.1
+web_p99_latency{endpoint="/api/orders",method="POST"} 135.8
+```
+
+**Example 2** - Food metrics:
+```csv
+min(potatoes),last(coke),avg(price),country,store_type
+5.2,10.5,12.99,USA,grocery
+3.8,8.2,9.99,Canada,convenience
+```
+
+Generates:
+```promql
+food_min_potatoes{country="USA",store_type="grocery"} 5.2
+food_last_coke{country="USA",store_type="grocery"} 10.5
+food_avg_price{country="USA",store_type="grocery"} 12.99
+# ... etc
+```
+
+Each row generates N samples (N = number of numeric columns).
+
+See [CSV-FORMAT-FLEXIBILITY.md](CSV-FORMAT-FLEXIBILITY.md) for more examples.
+
+**Options:**
+- `-file` - CSV file to watch (required)
+- `-metric-name` - Base metric name (required, e.g., `food`, `network`, `database`)
+- `-prometheus` - Prometheus Remote Write URL (default: http://localhost:9090/api/v1/write)
+- `-clickhouse` - ClickHouse HTTP URL (e.g. http://localhost:8123) to also ingest metrics
+- `-clickhouse-table` - ClickHouse table name (default: epimetheus_metrics)
+- `-job` - Job name for metrics (default: example_metrics_pusher)
+- `-resolve-ip-labels` - Additional IP labels to resolve via DNS (default: ip is always resolved)
+
+**ClickHouse Support:**
+Watch mode can ingest to ClickHouse in addition to (or instead of) Prometheus:
+
+```bash
+# Ingest to both Prometheus and ClickHouse
+./epimetheus -mode=watch -file=data.csv -metric-name=myapp \
+ -prometheus=http://localhost:9090/api/v1/write \
+ -clickhouse=http://localhost:8123
+
+# ClickHouse only (use -prometheus= to disable Prometheus)
+./epimetheus -mode=watch -file=test-data/watch-clickhouse-test.csv \
+ -metric-name=watch_test -clickhouse=http://localhost:8123 -prometheus=
+
+# Verify data in ClickHouse
+./verify-clickhouse.sh
+```
+
+**DNS Resolution:**
+By default, the `ip` label is automatically resolved to a hostname. To resolve additional IP labels:
+
+```bash
+./epimetheus -mode=watch \
+ -file=network.csv \
+ -metric-name=network \
+ -resolve-ip-labels=source_ip,dest_ip
+```
+
+This will resolve: `ip` (default) + `source_ip` + `dest_ip`
+
+**Example:**
+- Input: `ip="10.50.52.61"`
+- Output: `ip="foo.example.lan"`
+- Failed lookups: IP remains unchanged
+
+**Documentation:**
+- [DNS-RESOLUTION-FEATURE.md](DNS-RESOLUTION-FEATURE.md) - Complete DNS resolution guide
+- [CSV-FORMAT-FLEXIBILITY.md](CSV-FORMAT-FLEXIBILITY.md) - Works with ANY CSV format
+- [DTAIL-METRICS-EXAMPLE.md](DTAIL-METRICS-EXAMPLE.md) - Detailed dtail.csv example
+
+### 🔄 Realtime Mode (Default)
+Push current metrics to Pushgateway with "now" timestamp.
+
+```bash
+./epimetheus -mode=realtime -continuous
+```
+
+**Options:**
+- `-pushgateway` - Pushgateway URL (default: http://localhost:9091)
+- `-job` - Job name (default: example_metrics_pusher)
+- `-continuous` - Keep pushing every 15 seconds
+
+### ⏰ Historic Mode
+Push a single datapoint from the past using Remote Write API.
+
+```bash
+# Port-forward Prometheus
+kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 &
+
+# Push data from 24 hours ago
+./epimetheus -mode=historic -hours-ago=24
+```
+
+**Options:**
+- `-prometheus` - Prometheus URL (default: http://localhost:9090/api/v1/write)
+- `-hours-ago` - Hours in the past (default: 24)
+
+### 📦 Backfill Mode
+Import a range of historic data points.
+
+```bash
+# Backfill last 48 hours with 1-hour intervals
+./epimetheus -mode=backfill -start-hours=48 -end-hours=0 -interval=1
+
+# Backfill last week with 6-hour intervals
+./epimetheus -mode=backfill -start-hours=168 -end-hours=0 -interval=6
+```
+
+**Options:**
+- `-start-hours` - Start time in hours ago
+- `-end-hours` - End time in hours ago (0 = now)
+- `-interval` - Interval between points in hours
+
+### 🤖 Auto Mode (Recommended!)
+Automatically detect timestamp age and route to the correct ingestion method.
+
+```bash
+# Generate test data
+./generate-test-data.sh
+
+# Import mixed current and historic data
+./epimetheus -mode=auto -file=test-all-ages.csv
+```
+
+**Detection Logic:**
+- Data < 5 minutes old → Pushgateway (realtime)
+- Data ≥ 5 minutes old → Remote Write (historic)
+
+**Options:**
+- `-file` - Input file path
+- `-format` - Data format: csv or json (default: csv)
+- `-pushgateway` - Pushgateway URL
+- `-prometheus` - Prometheus Remote Write URL
+
+## Data Formats
+
+### CSV Format
+
+```csv
+# Format: metric_name,labels,value,timestamp_ms
+# Labels: key1=value1;key2=value2
+epimetheus_test_requests_total,instance=web1;env=prod,100,1767125148000
+epimetheus_test_temperature_celsius,instance=web2,22.5,1767038748000
+
+# Timestamp is optional (uses "now" if omitted)
+epimetheus_test_active_connections,instance=web3,42,
+```
+
+### JSON Format
+
+```json
+[
+ {
+ "metric": "epimetheus_test_requests_total",
+ "labels": {"instance": "web1", "env": "prod"},
+ "value": 100,
+ "timestamp_ms": 1767125148000
+ },
+ {
+ "metric": "epimetheus_test_temperature_celsius",
+ "labels": {"instance": "web2"},
+ "value": 22.5,
+ "timestamp_ms": 1767038748000
+ }
+]
+```
+
+## Test Metrics
+
+All generated metrics use the `epimetheus_test_` prefix to clearly identify them as test data.
+
+### Counter: `epimetheus_test_requests_total`
+- **Type:** Counter (monotonically increasing)
+- **Description:** Total number of requests processed
+- **Use case:** Counting total events, requests, errors
+
+### Gauge: `epimetheus_test_active_connections`
+- **Type:** Gauge (can increase or decrease)
+- **Description:** Current number of active connections (0-100)
+- **Use case:** Current state measurements, capacity
+
+### Gauge: `epimetheus_test_temperature_celsius`
+- **Type:** Gauge
+- **Description:** Current temperature in Celsius (0-50°C)
+- **Use case:** Environmental monitoring
+
+### Histogram: `epimetheus_test_request_duration_seconds`
+- **Type:** Histogram (distribution)
+- **Description:** Request duration distribution
+- **Buckets:** 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 seconds
+- **Use case:** Latency measurements, SLO tracking
+
+### Labeled Counter: `epimetheus_test_jobs_processed_total`
+- **Type:** Counter with labels
+- **Description:** Jobs processed by type and status
+- **Labels:**
+ - `job_type`: email, report, backup
+ - `status`: success, failed
+- **Use case:** Categorized counting, multi-dimensional metrics
+
+## Grafana Dashboard
+
+A comprehensive dashboard is available showcasing all test metrics.
+
+### Dashboard Features
+
+- **8 Panels:**
+ 1. Request Rate (line graph)
+ 2. Total Requests (stat panel)
+ 3. Active Connections (gauge with thresholds)
+ 4. Temperature (gauge with thresholds)
+ 5. Request Duration Histogram (p50, p90, p99)
+ 6. Average Request Duration (stat)
+ 7. Jobs Processed by Type (bar gauge)
+ 8. Jobs Status Breakdown (table)
+
+- **Auto-refresh:** Every 10 seconds
+- **Time range:** Last 15 minutes (customizable)
+- **Dark theme optimized**
+
+### Deploy Dashboard
+
+#### Option 1: Helm/Kubernetes ConfigMap (Recommended)
+
+```bash
+# Deploy via Kubernetes ConfigMap
+kubectl apply -f ../prometheus/epimetheus-dashboard.yaml
+```
+
+The dashboard will be automatically discovered by Grafana.
+
+#### Option 2: Manual Import
+
+```bash
+# Port-forward Grafana
+kubectl port-forward -n monitoring svc/prometheus-grafana 3000:80
+
+# Open Grafana
+open http://localhost:3000
+
+# Go to Dashboards → Import → Upload grafana-dashboard.json
+```
+
+#### Option 3: Automated Script
+
+```bash
+# Deploy via API
+./deploy-dashboard.sh
+
+# Or with custom credentials
+GRAFANA_URL="http://localhost:3000" \
+GRAFANA_USER="admin" \
+GRAFANA_PASSWORD="yourpassword" \
+./deploy-dashboard.sh
+```
+
+## Example Queries
+
+### Basic Queries
+
+```promql
+# View total requests
+epimetheus_test_requests_total
+
+# View request rate over last 5 minutes
+rate(epimetheus_test_requests_total[5m])
+
+# View current active connections
+epimetheus_test_active_connections
+
+# View current temperature
+epimetheus_test_temperature_celsius
+```
+
+### Histogram Queries
+
+```promql
+# 95th percentile request duration
+histogram_quantile(0.95, rate(epimetheus_test_request_duration_seconds_bucket[5m]))
+
+# 50th percentile (median)
+histogram_quantile(0.50, rate(epimetheus_test_request_duration_seconds_bucket[5m]))
+
+# Average request duration
+rate(epimetheus_test_request_duration_seconds_sum[5m]) /
+rate(epimetheus_test_request_duration_seconds_count[5m])
+```
+
+### Labeled Counter Queries
+
+```promql
+# Failed jobs by type
+epimetheus_test_jobs_processed_total{status="failed"}
+
+# Job success rate
+rate(epimetheus_test_jobs_processed_total{status="success"}[5m]) /
+rate(epimetheus_test_jobs_processed_total[5m])
+
+# Total jobs by type
+sum by (job_type) (epimetheus_test_jobs_processed_total)
+```
+
+### Curl Examples
+
+```bash
+# Port-forward Prometheus
+kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 &
+
+# Query total requests
+curl -s "http://localhost:9090/api/v1/query?query=epimetheus_test_requests_total" | jq .
+
+# Query temperature
+curl -s "http://localhost:9090/api/v1/query?query=epimetheus_test_temperature_celsius" | jq .
+
+# Query request rate
+curl -s "http://localhost:9090/api/v1/query?query=rate(epimetheus_test_requests_total[5m])" | jq .
+
+# Query histogram p95
+curl -s "http://localhost:9090/api/v1/query?query=histogram_quantile(0.95,rate(epimetheus_test_request_duration_seconds_bucket[5m]))" | jq .
+```
+
+## Time Range Limitations
+
+### ✅ Supported Time Ranges
+
+| Time Range | Status | Method |
+|------------|--------|--------|
+| Current (< 5 min) | ✅ Works | Pushgateway |
+| 1 hour old | ✅ Works | Remote Write |
+| 1 day old | ✅ Works | Remote Write |
+| 1 week old | ✅ Works | Remote Write |
+| 1 month old | ✅ Works | Remote Write |
+
+### ⚠️ Potential Issues
+
+- **Future timestamps:** Rejected (> 5 minutes in future)
+- **Very old data (6+ months):** May be rejected depending on Prometheus retention
+- **Years old:** Likely rejected - use `promtool tsdb create-blocks-from` instead
+- **Out-of-order samples:** Can't insert older data into existing time series (use different labels)
+
+### Prometheus Configuration
+
+Check your retention settings:
+
+```bash
+# View retention
+kubectl get prometheus -n monitoring prometheus-kube-prometheus-prometheus \
+ -o jsonpath='{.spec.retention}'
+
+# Default is typically 15 days
+```
+
+For very old data:
+- Increase retention in Prometheus config
+- Enable out-of-order ingestion (experimental)
+- Use `promtool` for direct TSDB block creation
+
+## Project Structure
+
+```
+epimetheus/
+├── cmd/
+│ └── epimetheus/
+│ └── main.go # Main entry point
+├── internal/
+│ ├── config/ # Configuration
+│ ├── metrics/ # Metric generators
+│ ├── parser/ # CSV/JSON parsers (includes tabular CSV)
+│ ├── ingester/ # Pushgateway & Remote Write ingesters
+│ └── watcher/ # File watcher for watch mode
+├── epimetheus # Compiled binary
+├── grafana-dashboard.json # Grafana dashboard definition
+├── deploy-dashboard.sh # Dashboard deployment script
+├── generate-test-data.sh # Test data generator
+├── run.sh # Helper script
+└── README.md # This file
+```
+
+## Setup Requirements
+
+### 1. Enable Prometheus Remote Write Receiver ⚠️ **REQUIRED for Historic Data**
+
+**IMPORTANT**: To use historic mode, backfill mode, or auto mode with old data, you **must** enable the Prometheus Remote Write receiver. Without this feature, Epimetheus can only push realtime data via Pushgateway.
+
+The Remote Write receiver is configured in the [conf repository](https://codeberg.org/snonux/conf) at `f3s/prometheus/persistence-values.yaml`:
+
+```yaml
+# In prometheus/persistence-values.yaml (from conf repository)
+prometheus:
+ prometheusSpec:
+ # Enable Remote Write receiver endpoint and Admin API (Prometheus 3.x syntax)
+ additionalArgs:
+ - name: web.enable-remote-write-receiver
+ value: ""
+ - name: web.enable-admin-api
+ value: ""
+
+ # Enable out-of-order ingestion for backfilling
+ # Allows writing data points older than existing data for the same time series
+ enableFeatures:
+ - exemplar-storage
+ - otlp-write-receiver
+
+ # Allow backfilling up to 31 days in the past (provides 1-day buffer for 30-day datasets)
+ tsdb:
+ outOfOrderTimeWindow: 744h # 31 days
+```
+
+**What This Enables:**
+- **Remote Write API**: HTTP endpoint at `/api/v1/write` for ingesting metrics with custom timestamps
+- **Admin API**: HTTP endpoints at `/api/v1/admin/tsdb/*` for data deletion and management
+- **Out-of-Order Ingestion**: Allows writing data points older than existing data for the same time series
+- **31-Day Window**: Can backfill data up to 31 days in the past (provides 1-day buffer for 30-day datasets)
+
+After updating the configuration, upgrade your Prometheus installation:
+
+```bash
+cd conf/f3s/prometheus
+just upgrade # Or manually:
+# helm upgrade prometheus prometheus-community/kube-prometheus-stack \
+# -n monitoring -f persistence-values.yaml
+```
+
+Verify the features are enabled:
+
+```bash
+# Check Remote Write receiver flag
+kubectl get pod -n monitoring prometheus-prometheus-kube-prometheus-prometheus-0 \
+ -o jsonpath='{.spec.containers[0].args}' | grep -o "web.enable-remote-write-receiver"
+
+# Check out-of-order time window
+kubectl get prometheus -n monitoring prometheus-kube-prometheus-prometheus \
+ -o jsonpath='{.spec.tsdb.outOfOrderTimeWindow}'
+# Should output: 744h
+
+# Check admin API flag
+kubectl get pod -n monitoring prometheus-prometheus-kube-prometheus-prometheus-0 \
+ -o jsonpath='{.spec.containers[0].args}' | grep -o "web.enable-admin-api"
+```
+
+**Performance Considerations:**
+
+This configuration is designed for ad-hoc troubleshooting and development, **NOT production use**. Enabling these features has trade-offs:
+
+- **Increased Memory Usage**: Out-of-order ingestion requires additional memory for buffering and sorting time series
+- **Higher TSDB Overhead**: Prometheus TSDB needs to handle non-sequential writes, increasing disk I/O
+- **Query Performance**: Queries may be slower due to fragmented data blocks
+- **Storage Amplification**: Out-of-order samples can trigger additional compactions, increasing storage usage
+
+**Recommendation for Production:**
+- Keep `outOfOrderTimeWindow` as small as possible (or disabled)
+- Monitor Prometheus memory and disk usage closely
+- Use Remote Write only when necessary
+- Consider using dedicated testing/development Prometheus instances
+
+**Note**: The syntax changed in Prometheus 3.x - use `additionalArgs` with `web.enable-remote-write-receiver` instead of the deprecated `enableFeatures: [remote-write-receiver]`.
+
+### 2. Update Prometheus Scrape Config
+
+Ensure Pushgateway is in scrape targets:
+
+```yaml
+# additional-scrape-configs.yaml
+- job_name: 'pushgateway'
+ honor_labels: true
+ static_configs:
+ - targets:
+ - 'pushgateway.monitoring.svc.cluster.local:9091'
+```
+
+Apply the configuration:
+
+```bash
+kubectl create secret generic additional-scrape-configs \
+ --from-file=/home/paul/git/conf/f3s/prometheus/additional-scrape-configs.yaml \
+ --dry-run=client -o yaml -n monitoring | kubectl apply -f -
+```
+
+## Building from Source
+
+### Using Mage (Recommended)
+
+This project includes a [Magefile](./MAGEFILE.md) for easy building, testing, and running:
+
+```bash
+# Install Mage (one-time setup)
+go install github.com/magefile/mage@latest
+
+# Build binary
+mage build
+
+# Run tests
+mage test
+
+# Run with coverage report
+mage testCoverage
+
+# Run in realtime mode
+mage run
+
+# See all available targets
+mage -l
+```
+
+See [MAGEFILE.md](./MAGEFILE.md) for complete documentation.
+
+### Using Go directly
+
+```bash
+# Build binary
+go build -o epimetheus cmd/epimetheus/main.go
+
+# Run tests
+go test ./... -v
+
+# Check test coverage
+go test ./... -cover
+```
+
+## Troubleshooting
+
+### Binary can't connect to Pushgateway
+
+```bash
+# Check port-forward is running
+ps aux | grep "port-forward.*9091"
+
+# Restart port-forward
+kubectl port-forward -n monitoring svc/pushgateway 9091:9091
+```
+
+### Metrics not appearing in Prometheus
+
+```bash
+# Check Pushgateway has metrics
+curl http://localhost:9091/metrics | grep "prometheus_pusher_test"
+
+# Check Prometheus scrape targets
+# Open http://localhost:9090/targets - look for "pushgateway" job
+
+# Check Prometheus logs
+kubectl logs -n monitoring -l app.kubernetes.io/name=prometheus
+```
+
+### "Remote write receiver not enabled" error
+
+```bash
+# Verify feature is enabled
+kubectl logs -n monitoring prometheus-prometheus-kube-prometheus-prometheus-0 | grep "remote-write-receiver"
+
+# Should see: msg="Experimental features enabled" features=[remote-write-receiver]
+```
+
+### "Out of order sample" error
+
+This occurs when trying to insert data older than existing data for the same time series.
+
+**Solutions:**
+- Use different job labels for historic data (e.g., `job="historic_data"`)
+- Enable out-of-order ingestion in Prometheus (experimental)
+- Ensure backfill goes from oldest to newest
+
+### Dashboard not appearing in Grafana
+
+```bash
+# Check ConfigMap exists
+kubectl get configmap -n monitoring | grep epimetheus
+
+# Check labels
+kubectl get configmap epimetheus-dashboard -n monitoring -o yaml | grep "grafana_dashboard"
+
+# Restart Grafana to force reload
+kubectl rollout restart deployment/prometheus-grafana -n monitoring
+```
+
+## Architecture
+
+```
+┌─────────────────┐
+│ Go Binary │
+│ (prometheus- │──Push realtime──┐
+│ pusher) │ │
+└─────────────────┘ ▼
+ │ ┌──────────────────┐
+ │ │ Pushgateway │◄──Scrape──┐
+ │ │ (Port 9091) │ │
+ │ └──────────────────┘ │
+ │ │
+ └──Push historic──────────────────┐ │
+ ▼ │
+ ┌─────────────────┐ │
+ │ Prometheus │◄────┘
+ │ (Port 9090) │
+ │ Remote Write API│
+ └─────────────────┘
+ │
+ │ Datasource
+ ▼
+ ┌─────────────────┐
+ │ Grafana │
+ │ (Port 3000) │
+ │ Dashboards │
+ └─────────────────┘
+```
+
+## Best Practices
+
+### When to Use Pushgateway vs. Remote Write
+
+**Use Pushgateway (realtime mode):**
+- Short-lived batch jobs
+- Service-level metrics
+- Jobs behind firewalls
+- Current/recent data (< 5 minutes old)
+
+**Use Remote Write (historic mode):**
+- Historic data import
+- Backfilling gaps
+- Data migration
+- Data older than 5 minutes
+
+**Use Auto Mode:**
+- Mixed current and historic data
+- Importing from files
+- Unknown timestamp ages
+- General-purpose ingestion
+
+### Metric Design
+
+- **Use appropriate metric types:**
+ - Counter for cumulative values (requests, errors)
+ - Gauge for point-in-time values (temperature, connections)
+ - Histogram for distributions (latency, sizes)
+
+- **Label cardinality:**
+ - Include meaningful labels
+ - Avoid high-cardinality labels (user IDs, timestamps)
+ - Keep label combinations reasonable (< 1000 per metric)
+
+- **Naming conventions:**
+ - Use descriptive names
+ - Include units in gauge names (\_celsius, \_bytes)
+ - Use \_total suffix for counters
+
+## Cleanup
+
+### Cleaning Up Benchmark Data from Prometheus
+
+For cleaning up benchmark metrics from Prometheus, use the provided cleanup script:
+
+```bash
+# Port-forward to Prometheus
+kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 &
+
+# Run the cleanup script
+./cleanup-benchmark-data.sh
+```
+
+The script will:
+1. Delete all `epimetheus_benchmark_*` metrics using the Prometheus Admin API
+2. Clean up tombstones to free disk space
+3. Provide clear success/error feedback
+
+**Manual cleanup** (if you prefer):
+
+```bash
+# Delete specific metric
+curl -X POST 'http://localhost:9090/api/v1/admin/tsdb/delete_series?match[]=epimetheus_benchmark_cpu_usage'
+
+# Clean up tombstones
+curl -X POST 'http://localhost:9090/api/v1/admin/tsdb/clean_tombstones'
+```
+
+### Other Cleanup Tasks
+
+```bash
+# Stop port-forwards
+pkill -f "port-forward.*9091"
+pkill -f "port-forward.*9090"
+pkill -f "port-forward.*3000"
+
+# Delete test metrics from Pushgateway
+curl -X DELETE http://localhost:9091/metrics/job/example_metrics_pusher
+
+# Uninstall Pushgateway (if needed)
+helm uninstall pushgateway -n monitoring
+```
+
+## MacOS Setup
+
+### Basic Installation
+
+```bash
+brew install prometheus
+brew install grafana
+go install github.com/prometheus/pushgateway@latest
+brew services start grafana
+brew services start prometheus
+~/go/bin/pushgateway &
+```
+
+Once done, login to http://localhost:3000 as admin:admin, you will be prompted to change the password. Afterwards, add http://localhost:9090 as a Prometheus datasource.
+
+### Enable Remote Write Receiver (Required for Watch Mode)
+
+⚠️ **Important**: Watch mode, historic mode, backfill mode, and auto mode require the Prometheus Remote Write receiver to be enabled.
+
+#### Option 1: Permanent Configuration (Recommended)
+
+Edit the Prometheus arguments file:
+
+```bash
+# Edit the arguments file
+nano /opt/homebrew/etc/prometheus.args
+```
+
+Add this line at the end:
+```
+--web.enable-remote-write-receiver
+```
+
+The complete file should look like:
+```
+--config.file /opt/homebrew/etc/prometheus.yml
+--web.listen-address=127.0.0.1:9090
+--storage.tsdb.path /opt/homebrew/var/prometheus
+--web.enable-remote-write-receiver
+--web.enable-admin-api
+```
+
+**Note:** `--web.enable-admin-api` is optional but recommended for easier data management (allows deleting old metrics).
+
+Restart Prometheus:
+```bash
+brew services restart prometheus
+```
+
+Verify it's working:
+```bash
+# Check Prometheus is healthy
+curl http://localhost:9090/-/healthy
+
+# Test Remote Write endpoint (should return 400, not 404)
+curl -X POST http://localhost:9090/api/v1/write
+```
+
+#### Option 2: Temporary (For Testing)
+
+Stop the service and start manually:
+
+```bash
+# Stop brew service
+brew services stop prometheus
+
+# Start with Remote Write enabled
+prometheus --web.enable-remote-write-receiver
+```
+
+Keep this terminal open. In another terminal, run your epimetheus commands.
+
+**Note**: This only lasts until you stop the terminal. Use Option 1 for permanent setup.
+
+### Clearing Old Metrics (Optional)
+
+If you need to delete old metrics and start fresh:
+
+```bash
+# Delete specific metrics (e.g., blockstore)
+curl -X POST -g 'http://localhost:9090/api/v1/admin/tsdb/delete_series?match[]={__name__=~"blockstore_.*"}'
+
+# Clean up deleted data
+curl -X POST http://localhost:9090/api/v1/admin/tsdb/clean_tombstones
+
+# Wait a moment for cleanup
+sleep 2
+```
+
+**Note:** Admin API must be enabled (add `--web.enable-admin-api` to prometheus.args).
+
+### Verify Setup
+
+Once Remote Write is enabled, test watch mode:
+
+```bash
+# Create a test CSV
+cat > /tmp/test.csv << EOF
+status,count,method
+200,100,GET
+404,50,POST
+EOF
+
+# Watch the file
+./epimetheus -mode=watch \
+ -file=/tmp/test.csv \
+ -metric-name=test \
+ -prometheus=http://localhost:9090/api/v1/write
+```
+
+You should see:
+```
+✅ Successfully pushed X samples to Prometheus
+```
+
+Query in Prometheus (http://localhost:9090):
+```promql
+{__name__=~"test_.*"}
+```
+
+## Additional Resources
+
+- [Prometheus Documentation](https://prometheus.io/docs/)
+- [Pushgateway Documentation](https://github.com/prometheus/pushgateway)
+- [Prometheus Remote Write Spec](https://prometheus.io/docs/concepts/remote_write_spec/)
+- [Grafana Documentation](https://grafana.com/docs/)
+
+## Version
+
+Current version: 0.0.0
+
+## License
+
+See LICENSE file for details.
diff --git a/backfill-historic-data.sh b/backfill-historic-data.sh
new file mode 100755
index 0000000..fa0e065
--- /dev/null
+++ b/backfill-historic-data.sh
@@ -0,0 +1,60 @@
+#!/bin/bash
+# Backfill historic data to Prometheus for Epimetheus dashboard
+
+set -e
+
+echo "=== Epimetheus Historic Data Backfill ==="
+echo ""
+echo "This script will populate Prometheus with historic test data"
+echo "going back 7 days, with data points every 12 hours."
+echo ""
+
+# Port-forward to Prometheus
+echo "Step 1: Setting up port-forward to Prometheus..."
+kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 > /tmp/epimetheus-prom-pf.log 2>&1 &
+PF_PID=$!
+echo "Port-forward started (PID: $PF_PID)"
+
+# Wait for port-forward to be ready
+sleep 5
+
+# Run backfill
+echo ""
+echo "Step 2: Backfilling data from 7 days ago to now (12-hour intervals)..."
+echo ""
+./epimetheus -mode=backfill \
+ -prometheus=http://localhost:9090/api/v1/write \
+ -start-hours=168 \
+ -end-hours=0 \
+ -interval=12
+
+EXIT_CODE=$?
+
+# Clean up
+echo ""
+echo "Step 3: Cleaning up port-forward..."
+kill $PF_PID 2>/dev/null || true
+
+if [ $EXIT_CODE -eq 0 ]; then
+ echo ""
+ echo "✅ Historic data backfill complete!"
+ echo ""
+ echo "The Grafana dashboard timeline should now show data from:"
+ echo " - 7 days ago"
+ echo " - 6 days ago"
+ echo " - 5 days ago"
+ echo " - 4 days ago"
+ echo " - 3 days ago"
+ echo " - 2 days ago"
+ echo " - 1 day ago"
+ echo " - 12 hours ago"
+ echo " - Now (from previous realtime push)"
+ echo ""
+ echo "View the dashboard at: https://grafana.f3s.buetow.org/d/epimetheus-test/epimetheus-test-metrics"
+else
+ echo ""
+ echo "❌ Backfill failed with exit code $EXIT_CODE"
+ echo "Check /tmp/epimetheus-prom-pf.log for port-forward logs"
+fi
+
+exit $EXIT_CODE
diff --git a/benchmark-100mb.sh b/benchmark-100mb.sh
new file mode 100755
index 0000000..1d3fad0
--- /dev/null
+++ b/benchmark-100mb.sh
@@ -0,0 +1,223 @@
+#!/bin/bash
+# Benchmark script: Generate and ingest 100MB of historic metrics
+# This tests Epimetheus performance with large-scale data ingestion
+
+set -e
+
+# Optimize Go GC for better performance (Phase 3 optimization)
+export GOGC=200 # Reduce GC frequency (default 100)
+export GOMEMLIMIT=3GiB # Set memory limit for Go 1.19+
+
+BENCHMARK_DIR="benchmark-results"
+TIMESTAMP=$(date +%Y%m%d-%H%M%S)
+RESULT_FILE="$BENCHMARK_DIR/benchmark-$TIMESTAMP.log"
+
+mkdir -p "$BENCHMARK_DIR"
+
+echo "=== Epimetheus 100MB Benchmark ===" | tee "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+echo "Timestamp: $(date)" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 1: Generate 100MB of test data
+echo "Step 1: Generating 100MB of test data..." | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Calculate: ~70 bytes per line, 100MB = ~1.5M lines
+TARGET_SIZE_MB=100
+TARGET_BYTES=$((TARGET_SIZE_MB * 1024 * 1024))
+BYTES_PER_LINE=70
+TARGET_LINES=$((TARGET_BYTES / BYTES_PER_LINE))
+
+echo "Target size: ${TARGET_SIZE_MB}MB" | tee -a "$RESULT_FILE"
+echo "Estimated lines needed: $TARGET_LINES" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Generate data going back 7 days with 1-minute intervals
+# This gives us ~10,080 data points across 7 days
+# We'll generate multiple metrics per timestamp to reach 100MB
+# All data is historic (> 5 minutes old) to use Remote Write API exclusively
+
+GENERATION_START=$(date +%s)
+
+NOW=$(date +%s)000 # Current time in milliseconds
+ONE_HOUR_AGO=$((NOW - 3600000)) # Start from 1 hour ago to ensure all data is historic
+SEVEN_DAYS_AGO=$((ONE_HOUR_AGO - 604800000)) # 7 days before that
+
+# CSV header
+cat > benchmark-data-100mb.csv << 'EOF'
+# Prometheus metrics - 100MB benchmark dataset
+# Format: metric_name,labels,value,timestamp_ms
+EOF
+
+# Generate metrics
+# We'll create ~150 unique time series, each with ~10,000 data points = 1.5M samples
+METRICS=(
+ "epimetheus_benchmark_cpu_usage"
+ "epimetheus_benchmark_memory_bytes"
+ "epimetheus_benchmark_disk_io_bytes"
+ "epimetheus_benchmark_network_rx_bytes"
+ "epimetheus_benchmark_network_tx_bytes"
+ "epimetheus_benchmark_requests_total"
+ "epimetheus_benchmark_errors_total"
+ "epimetheus_benchmark_response_time_ms"
+ "epimetheus_benchmark_active_connections"
+ "epimetheus_benchmark_queue_depth"
+)
+
+INSTANCES=(
+ "web-01" "web-02" "web-03" "web-04" "web-05"
+ "api-01" "api-02" "api-03" "api-04" "api-05"
+ "db-01" "db-02" "db-03" "worker-01" "worker-02"
+)
+
+INTERVAL_MS=60000 # 1 minute interval
+TOTAL_INTERVALS=10080 # 7 days of 1-minute intervals
+
+echo "Generating data..." | tee -a "$RESULT_FILE"
+LINES_GENERATED=0
+
+for ((i=0; i<TOTAL_INTERVALS; i++)); do
+ TIMESTAMP=$((SEVEN_DAYS_AGO + (i * INTERVAL_MS)))
+
+ # Generate a sample for each metric x instance combination
+ for METRIC in "${METRICS[@]}"; do
+ for INSTANCE in "${INSTANCES[@]}"; do
+ VALUE=$((RANDOM % 1000))
+ echo "$METRIC,instance=$INSTANCE;env=benchmark,$VALUE,$TIMESTAMP" >> benchmark-data-100mb.csv
+ LINES_GENERATED=$((LINES_GENERATED + 1))
+ done
+ done
+
+ # Progress indicator every 1000 intervals
+ if [ $((i % 1000)) -eq 0 ]; then
+ PROGRESS=$((i * 100 / TOTAL_INTERVALS))
+ echo -ne "\rProgress: $PROGRESS% ($LINES_GENERATED lines)" | tee -a "$RESULT_FILE"
+ fi
+done
+
+echo "" | tee -a "$RESULT_FILE"
+
+GENERATION_END=$(date +%s)
+GENERATION_TIME=$((GENERATION_END - GENERATION_START))
+
+# Get actual file size
+FILE_SIZE=$(stat -f%z benchmark-data-100mb.csv 2>/dev/null || stat -c%s benchmark-data-100mb.csv 2>/dev/null)
+FILE_SIZE_MB=$((FILE_SIZE / 1024 / 1024))
+
+echo "" | tee -a "$RESULT_FILE"
+echo "Data generation complete:" | tee -a "$RESULT_FILE"
+echo " Lines generated: $LINES_GENERATED" | tee -a "$RESULT_FILE"
+echo " File size: ${FILE_SIZE_MB}MB ($FILE_SIZE bytes)" | tee -a "$RESULT_FILE"
+echo " Generation time: ${GENERATION_TIME}s" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 2: Start port-forward to Prometheus
+echo "Step 2: Setting up port-forward to Prometheus..." | tee -a "$RESULT_FILE"
+kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 > /tmp/benchmark-pf.log 2>&1 &
+PF_PID=$!
+echo "Port-forward started (PID: $PF_PID)" | tee -a "$RESULT_FILE"
+sleep 8 # Wait for port-forward to be ready
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 3: Get baseline Prometheus metrics
+echo "Step 3: Collecting baseline Prometheus metrics..." | tee -a "$RESULT_FILE"
+PROM_POD=$(kubectl get pod -n monitoring -l app.kubernetes.io/name=prometheus -o jsonpath='{.items[0].metadata.name}')
+echo "Prometheus pod: $PROM_POD" | tee -a "$RESULT_FILE"
+
+# Get memory and CPU usage before ingestion
+BASELINE_MEMORY=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $3}')
+BASELINE_CPU=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $2}')
+
+echo " Baseline memory: $BASELINE_MEMORY" | tee -a "$RESULT_FILE"
+echo " Baseline CPU: $BASELINE_CPU" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 4: Run ingestion benchmark
+echo "Step 4: Running ingestion benchmark..." | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+INGEST_START=$(date +%s.%N)
+
+# Run epimetheus with time measurement
+# Use CSV mode with Remote Write API (all data is historic)
+# Note: We can't use auto mode because it requires both Pushgateway and Remote Write
+# Instead, we'll implement a direct CSV->Remote Write ingestion
+
+echo "Parsing CSV and preparing for Remote Write ingestion..." | tee -a "$RESULT_FILE"
+
+# For now, use backfill mode to process the CSV data
+# We'll need to enhance epimetheus to support pure CSV->RemoteWrite mode
+echo "WARNING: Using auto mode - this may fail if data is too recent" | tee -a "$RESULT_FILE"
+echo "Continuing with Remote Write API for historic data..." | tee -a "$RESULT_FILE"
+
+/usr/bin/time -v ./epimetheus \
+ -mode=auto \
+ -file=benchmark-data-100mb.csv \
+ -format=csv \
+ -prometheus=http://localhost:9090/api/v1/write \
+ -pushgateway=http://localhost:9091 \
+ 2>&1 | tee -a "$RESULT_FILE" || true # Continue even if pushgateway fails
+
+INGEST_END=$(date +%s.%N)
+
+# Calculate ingestion time
+INGEST_TIME=$(echo "$INGEST_END - $INGEST_START" | bc)
+
+echo "" | tee -a "$RESULT_FILE"
+echo "Ingestion complete:" | tee -a "$RESULT_FILE"
+echo " Total time: ${INGEST_TIME}s" | tee -a "$RESULT_FILE"
+
+# Calculate throughput
+SAMPLES_PER_SECOND=$(echo "scale=2; $LINES_GENERATED / $INGEST_TIME" | bc)
+MB_PER_SECOND=$(echo "scale=2; $FILE_SIZE_MB / $INGEST_TIME" | bc)
+
+echo " Samples/second: $SAMPLES_PER_SECOND" | tee -a "$RESULT_FILE"
+echo " MB/second: $MB_PER_SECOND" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 5: Get post-ingestion Prometheus metrics
+echo "Step 5: Collecting post-ingestion Prometheus metrics..." | tee -a "$RESULT_FILE"
+sleep 5 # Wait for metrics to stabilize
+
+POST_MEMORY=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $3}')
+POST_CPU=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $2}')
+
+echo " Post-ingestion memory: $POST_MEMORY" | tee -a "$RESULT_FILE"
+echo " Post-ingestion CPU: $POST_CPU" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 6: Query some data to verify ingestion
+echo "Step 6: Verifying data ingestion..." | tee -a "$RESULT_FILE"
+QUERY_RESULT=$(curl -s "http://localhost:9090/api/v1/query?query=count(epimetheus_benchmark_cpu_usage)" | jq -r '.data.result[0].value[1]')
+echo " Samples found for epimetheus_benchmark_cpu_usage: $QUERY_RESULT" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 7: Cleanup
+echo "Step 7: Cleaning up..." | tee -a "$RESULT_FILE"
+kill $PF_PID 2>/dev/null || true
+echo "" | tee -a "$RESULT_FILE"
+
+# Summary
+echo "=== BENCHMARK SUMMARY ===" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+echo "Dataset:" | tee -a "$RESULT_FILE"
+echo " Size: ${FILE_SIZE_MB}MB" | tee -a "$RESULT_FILE"
+echo " Samples: $LINES_GENERATED" | tee -a "$RESULT_FILE"
+echo " Time range: 7 days" | tee -a "$RESULT_FILE"
+echo " Interval: 1 minute" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+echo "Performance:" | tee -a "$RESULT_FILE"
+echo " Generation time: ${GENERATION_TIME}s" | tee -a "$RESULT_FILE"
+echo " Ingestion time: ${INGEST_TIME}s" | tee -a "$RESULT_FILE"
+echo " Throughput: $SAMPLES_PER_SECOND samples/s" | tee -a "$RESULT_FILE"
+echo " Throughput: $MB_PER_SECOND MB/s" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+echo "Resources:" | tee -a "$RESULT_FILE"
+echo " Memory: $BASELINE_MEMORY -> $POST_MEMORY" | tee -a "$RESULT_FILE"
+echo " CPU: $BASELINE_CPU -> $POST_CPU" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+echo "Results saved to: $RESULT_FILE" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+echo "To view results: cat $RESULT_FILE"
+echo "To analyze: less $RESULT_FILE"
diff --git a/benchmark-1gb.sh b/benchmark-1gb.sh
new file mode 100755
index 0000000..f715376
--- /dev/null
+++ b/benchmark-1gb.sh
@@ -0,0 +1,223 @@
+#!/bin/bash
+# Benchmark script: Generate and ingest 1GB of historic metrics
+# This tests Epimetheus performance with large-scale data ingestion
+
+set -e
+
+# Optimize Go GC for better performance (Phase 3 optimization)
+export GOGC=200 # Reduce GC frequency (default 100)
+export GOMEMLIMIT=3GiB # Set memory limit for Go 1.19+
+
+BENCHMARK_DIR="benchmark-results"
+TIMESTAMP=$(date +%Y%m%d-%H%M%S)
+RESULT_FILE="$BENCHMARK_DIR/benchmark-1gb-$TIMESTAMP.log"
+
+mkdir -p "$BENCHMARK_DIR"
+
+echo "=== Epimetheus 1GB Benchmark ===" | tee "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+echo "Timestamp: $(date)" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 1: Generate 1GB of test data
+echo "Step 1: Generating 1GB of test data..." | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Calculate: ~80 bytes per line, 1GB = ~13M lines
+TARGET_SIZE_MB=1000
+TARGET_BYTES=$((TARGET_SIZE_MB * 1024 * 1024))
+BYTES_PER_LINE=80
+TARGET_LINES=$((TARGET_BYTES / BYTES_PER_LINE))
+
+echo "Target size: ${TARGET_SIZE_MB}MB" | tee -a "$RESULT_FILE"
+echo "Estimated lines needed: $TARGET_LINES" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Generate data going back 30 days with 30-second intervals
+# This gives us ~86,400 data points across 30 days (respects Prometheus 720h out-of-order limit)
+# We'll generate multiple metrics per timestamp to reach 1GB
+# All data is historic (> 5 minutes old) to use Remote Write API exclusively
+
+GENERATION_START=$(date +%s)
+
+NOW=$(date +%s)000 # Current time in milliseconds
+ONE_HOUR_AGO=$((NOW - 3600000)) # Start from 1 hour ago to ensure all data is historic
+THIRTY_DAYS_AGO=$((ONE_HOUR_AGO - 2592000000)) # 30 days before that (30 * 24 * 60 * 60 * 1000)
+
+# CSV header
+cat > benchmark-data-1gb.csv << 'EOF'
+# Prometheus metrics - 1GB benchmark dataset
+# Format: metric_name,labels,value,timestamp_ms
+EOF
+
+# Generate metrics
+# We'll create ~150 unique time series, each with ~86,400 data points = 13M samples
+METRICS=(
+ "epimetheus_benchmark_cpu_usage"
+ "epimetheus_benchmark_memory_bytes"
+ "epimetheus_benchmark_disk_io_bytes"
+ "epimetheus_benchmark_network_rx_bytes"
+ "epimetheus_benchmark_network_tx_bytes"
+ "epimetheus_benchmark_requests_total"
+ "epimetheus_benchmark_errors_total"
+ "epimetheus_benchmark_response_time_ms"
+ "epimetheus_benchmark_active_connections"
+ "epimetheus_benchmark_queue_depth"
+)
+
+INSTANCES=(
+ "web-01" "web-02" "web-03" "web-04" "web-05"
+ "api-01" "api-02" "api-03" "api-04" "api-05"
+ "db-01" "db-02" "db-03" "worker-01" "worker-02"
+)
+
+INTERVAL_MS=30000 # 30 second interval (to maintain 1GB size with 30 days)
+TOTAL_INTERVALS=86400 # 30 days of 30-second intervals
+
+echo "Generating data..." | tee -a "$RESULT_FILE"
+LINES_GENERATED=0
+
+for ((i=0; i<TOTAL_INTERVALS; i++)); do
+ TIMESTAMP=$((THIRTY_DAYS_AGO + (i * INTERVAL_MS)))
+
+ # Generate a sample for each metric x instance combination
+ for METRIC in "${METRICS[@]}"; do
+ for INSTANCE in "${INSTANCES[@]}"; do
+ VALUE=$((RANDOM % 1000))
+ echo "$METRIC,instance=$INSTANCE;env=benchmark,$VALUE,$TIMESTAMP" >> benchmark-data-1gb.csv
+ LINES_GENERATED=$((LINES_GENERATED + 1))
+ done
+ done
+
+ # Progress indicator every 5000 intervals
+ if [ $((i % 5000)) -eq 0 ]; then
+ PROGRESS=$((i * 100 / TOTAL_INTERVALS))
+ echo -ne "\rProgress: $PROGRESS% ($LINES_GENERATED lines)" | tee -a "$RESULT_FILE"
+ fi
+done
+
+echo "" | tee -a "$RESULT_FILE"
+
+GENERATION_END=$(date +%s)
+GENERATION_TIME=$((GENERATION_END - GENERATION_START))
+
+# Get actual file size
+FILE_SIZE=$(stat -f%z benchmark-data-1gb.csv 2>/dev/null || stat -c%s benchmark-data-1gb.csv 2>/dev/null)
+FILE_SIZE_MB=$((FILE_SIZE / 1024 / 1024))
+
+echo "" | tee -a "$RESULT_FILE"
+echo "Data generation complete:" | tee -a "$RESULT_FILE"
+echo " Lines generated: $LINES_GENERATED" | tee -a "$RESULT_FILE"
+echo " File size: ${FILE_SIZE_MB}MB ($FILE_SIZE bytes)" | tee -a "$RESULT_FILE"
+echo " Generation time: ${GENERATION_TIME}s" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 2: Start port-forward to Prometheus
+echo "Step 2: Setting up port-forward to Prometheus..." | tee -a "$RESULT_FILE"
+kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 > /tmp/benchmark-pf.log 2>&1 &
+PF_PID=$!
+echo "Port-forward started (PID: $PF_PID)" | tee -a "$RESULT_FILE"
+sleep 8 # Wait for port-forward to be ready
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 3: Get baseline Prometheus metrics
+echo "Step 3: Collecting baseline Prometheus metrics..." | tee -a "$RESULT_FILE"
+PROM_POD=$(kubectl get pod -n monitoring -l app.kubernetes.io/name=prometheus -o jsonpath='{.items[0].metadata.name}')
+echo "Prometheus pod: $PROM_POD" | tee -a "$RESULT_FILE"
+
+# Get memory and CPU usage before ingestion
+BASELINE_MEMORY=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $3}')
+BASELINE_CPU=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $2}')
+
+echo " Baseline memory: $BASELINE_MEMORY" | tee -a "$RESULT_FILE"
+echo " Baseline CPU: $BASELINE_CPU" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 4: Run ingestion benchmark
+echo "Step 4: Running ingestion benchmark..." | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+INGEST_START=$(date +%s.%N)
+
+# Run epimetheus with time measurement
+# Use CSV mode with Remote Write API (all data is historic)
+# Note: We can't use auto mode because it requires both Pushgateway and Remote Write
+# Instead, we'll implement a direct CSV->Remote Write ingestion
+
+echo "Parsing CSV and preparing for Remote Write ingestion..." | tee -a "$RESULT_FILE"
+
+# For now, use backfill mode to process the CSV data
+# We'll need to enhance epimetheus to support pure CSV->RemoteWrite mode
+echo "WARNING: Using auto mode - this may fail if data is too recent" | tee -a "$RESULT_FILE"
+echo "Continuing with Remote Write API for historic data..." | tee -a "$RESULT_FILE"
+
+/usr/bin/time -v ./epimetheus \
+ -mode=auto \
+ -file=benchmark-data-1gb.csv \
+ -format=csv \
+ -prometheus=http://localhost:9090/api/v1/write \
+ -pushgateway=http://localhost:9091 \
+ 2>&1 | tee -a "$RESULT_FILE" || true # Continue even if pushgateway fails
+
+INGEST_END=$(date +%s.%N)
+
+# Calculate ingestion time
+INGEST_TIME=$(echo "$INGEST_END - $INGEST_START" | bc)
+
+echo "" | tee -a "$RESULT_FILE"
+echo "Ingestion complete:" | tee -a "$RESULT_FILE"
+echo " Total time: ${INGEST_TIME}s" | tee -a "$RESULT_FILE"
+
+# Calculate throughput
+SAMPLES_PER_SECOND=$(echo "scale=2; $LINES_GENERATED / $INGEST_TIME" | bc)
+MB_PER_SECOND=$(echo "scale=2; $FILE_SIZE_MB / $INGEST_TIME" | bc)
+
+echo " Samples/second: $SAMPLES_PER_SECOND" | tee -a "$RESULT_FILE"
+echo " MB/second: $MB_PER_SECOND" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 5: Get post-ingestion Prometheus metrics
+echo "Step 5: Collecting post-ingestion Prometheus metrics..." | tee -a "$RESULT_FILE"
+sleep 5 # Wait for metrics to stabilize
+
+POST_MEMORY=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $3}')
+POST_CPU=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $2}')
+
+echo " Post-ingestion memory: $POST_MEMORY" | tee -a "$RESULT_FILE"
+echo " Post-ingestion CPU: $POST_CPU" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 6: Query some data to verify ingestion
+echo "Step 6: Verifying data ingestion..." | tee -a "$RESULT_FILE"
+QUERY_RESULT=$(curl -s "http://localhost:9090/api/v1/query?query=count(epimetheus_benchmark_cpu_usage)" | jq -r '.data.result[0].value[1]')
+echo " Samples found for epimetheus_benchmark_cpu_usage: $QUERY_RESULT" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+
+# Step 7: Cleanup
+echo "Step 7: Cleaning up..." | tee -a "$RESULT_FILE"
+kill $PF_PID 2>/dev/null || true
+echo "" | tee -a "$RESULT_FILE"
+
+# Summary
+echo "=== BENCHMARK SUMMARY ===" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+echo "Dataset:" | tee -a "$RESULT_FILE"
+echo " Size: ${FILE_SIZE_MB}MB" | tee -a "$RESULT_FILE"
+echo " Samples: $LINES_GENERATED" | tee -a "$RESULT_FILE"
+echo " Time range: 30 days" | tee -a "$RESULT_FILE"
+echo " Interval: 30 seconds" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+echo "Performance:" | tee -a "$RESULT_FILE"
+echo " Generation time: ${GENERATION_TIME}s" | tee -a "$RESULT_FILE"
+echo " Ingestion time: ${INGEST_TIME}s" | tee -a "$RESULT_FILE"
+echo " Throughput: $SAMPLES_PER_SECOND samples/s" | tee -a "$RESULT_FILE"
+echo " Throughput: $MB_PER_SECOND MB/s" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+echo "Resources:" | tee -a "$RESULT_FILE"
+echo " Memory: $BASELINE_MEMORY -> $POST_MEMORY" | tee -a "$RESULT_FILE"
+echo " CPU: $BASELINE_CPU -> $POST_CPU" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+echo "Results saved to: $RESULT_FILE" | tee -a "$RESULT_FILE"
+echo "" | tee -a "$RESULT_FILE"
+echo "To view results: cat $RESULT_FILE"
+echo "To analyze: less $RESULT_FILE"
diff --git a/cleanup-benchmark-data.sh b/cleanup-benchmark-data.sh
new file mode 100755
index 0000000..a5409f1
--- /dev/null
+++ b/cleanup-benchmark-data.sh
@@ -0,0 +1,88 @@
+#!/bin/bash
+# Cleanup script: Delete benchmark data from Prometheus
+# This uses the Prometheus Admin API to selectively remove benchmark metrics
+
+set -e
+
+PROMETHEUS_URL="${1:-http://localhost:9090}"
+
+echo "=== Prometheus Benchmark Data Cleanup ==="
+echo ""
+echo "Prometheus URL: $PROMETHEUS_URL"
+echo ""
+
+# Check if port-forward is needed
+if [[ "$PROMETHEUS_URL" == *"localhost"* ]]; then
+ echo "Note: Make sure you have port-forward running:"
+ echo " kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090"
+ echo ""
+fi
+
+# Metrics to delete
+METRICS=(
+ "epimetheus_benchmark_cpu_usage"
+ "epimetheus_benchmark_memory_bytes"
+ "epimetheus_benchmark_disk_io_bytes"
+ "epimetheus_benchmark_network_rx_bytes"
+ "epimetheus_benchmark_network_tx_bytes"
+ "epimetheus_benchmark_requests_total"
+ "epimetheus_benchmark_errors_total"
+ "epimetheus_benchmark_response_time_ms"
+ "epimetheus_benchmark_active_connections"
+ "epimetheus_benchmark_queue_depth"
+)
+
+echo "Step 1: Deleting benchmark metrics..."
+echo ""
+
+SUCCESS_COUNT=0
+ERROR_COUNT=0
+
+for METRIC in "${METRICS[@]}"; do
+ echo " Deleting: $METRIC"
+
+ # Delete series endpoint returns HTTP 204 No Content on success
+ HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST -g "${PROMETHEUS_URL}/api/v1/admin/tsdb/delete_series?match[]=${METRIC}")
+
+ if [ "$HTTP_CODE" == "204" ] || [ "$HTTP_CODE" == "200" ]; then
+ echo " ✅ Success (HTTP $HTTP_CODE)"
+ SUCCESS_COUNT=$((SUCCESS_COUNT + 1))
+ else
+ echo " ❌ Error: HTTP $HTTP_CODE"
+ ERROR_COUNT=$((ERROR_COUNT + 1))
+ fi
+done
+
+echo ""
+echo "Deletion summary: $SUCCESS_COUNT succeeded, $ERROR_COUNT failed"
+echo ""
+
+if [ $ERROR_COUNT -eq 0 ]; then
+ echo "Step 2: Cleaning up tombstones..."
+ echo ""
+
+ # Clean tombstones endpoint returns HTTP 204 No Content on success
+ CLEANUP_HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "${PROMETHEUS_URL}/api/v1/admin/tsdb/clean_tombstones")
+
+ if [ "$CLEANUP_HTTP_CODE" == "204" ] || [ "$CLEANUP_HTTP_CODE" == "200" ]; then
+ echo " ✅ Tombstones cleaned successfully (HTTP $CLEANUP_HTTP_CODE)"
+ echo ""
+ echo "🎉 Cleanup complete!"
+ echo ""
+ echo "Note: Prometheus may take a few moments to compact the database"
+ echo "and free up disk space."
+ else
+ echo " ❌ Error cleaning tombstones: HTTP $CLEANUP_HTTP_CODE"
+ exit 1
+ fi
+else
+ echo "⚠️ Some deletions failed. Skipping tombstone cleanup."
+ echo "Check Prometheus admin API is enabled with:"
+ echo " kubectl get prometheus -n monitoring prometheus-kube-prometheus-prometheus -o yaml | grep -A5 additionalArgs"
+ exit 1
+fi
+
+echo ""
+echo "To verify deletion, run:"
+echo " curl -s '${PROMETHEUS_URL}/api/v1/label/__name__/values' | jq '.data | map(select(startswith(\"epimetheus_benchmark\")))'"
+echo ""
diff --git a/cleanup-benchmark-metrics.sh b/cleanup-benchmark-metrics.sh
new file mode 100755
index 0000000..d70aa95
--- /dev/null
+++ b/cleanup-benchmark-metrics.sh
@@ -0,0 +1,83 @@
+#!/bin/bash
+# Cleanup benchmark metrics from Prometheus
+# This allows running benchmarks from a clean state
+
+set -e
+
+echo "=== Prometheus Benchmark Metrics Cleanup ==="
+echo ""
+
+# Port-forward to Prometheus
+echo "Setting up port-forward to Prometheus..."
+kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 > /tmp/cleanup-pf.log 2>&1 &
+PF_PID=$!
+echo "Port-forward started (PID: $PF_PID)"
+sleep 5
+
+# Check if Admin API is enabled
+echo ""
+echo "Checking if Prometheus Admin API is enabled..."
+ADMIN_CHECK=$(curl -s -o /dev/null -w "%{http_code}" -X POST "http://localhost:9090/api/v1/admin/tsdb/delete_series?match[]=epimetheus_benchmark_cpu_usage")
+
+if [ "$ADMIN_CHECK" = "204" ] || [ "$ADMIN_CHECK" = "200" ]; then
+ echo "✅ Admin API is enabled"
+ echo ""
+ echo "Deleting benchmark metrics..."
+
+ # Delete all benchmark metrics
+ METRICS=(
+ "epimetheus_benchmark_cpu_usage"
+ "epimetheus_benchmark_memory_bytes"
+ "epimetheus_benchmark_disk_io_bytes"
+ "epimetheus_benchmark_network_rx_bytes"
+ "epimetheus_benchmark_network_tx_bytes"
+ "epimetheus_benchmark_requests_total"
+ "epimetheus_benchmark_errors_total"
+ "epimetheus_benchmark_response_time_ms"
+ "epimetheus_benchmark_active_connections"
+ "epimetheus_benchmark_queue_depth"
+ )
+
+ for METRIC in "${METRICS[@]}"; do
+ echo " Deleting: $METRIC"
+ curl -s -X POST "http://localhost:9090/api/v1/admin/tsdb/delete_series?match[]=$METRIC" > /dev/null
+ done
+
+ echo ""
+ echo "Triggering tombstone cleanup (this removes deleted data from disk)..."
+ curl -s -X POST "http://localhost:9090/api/v1/admin/tsdb/clean_tombstones" > /dev/null
+
+ echo ""
+ echo "✅ Cleanup complete!"
+elif [ "$ADMIN_CHECK" = "405" ]; then
+ echo "❌ Admin API is NOT enabled"
+ echo ""
+ echo "To enable the Admin API, update your Prometheus configuration:"
+ echo ""
+ echo "In f3s/prometheus/persistence-values.yaml, add:"
+ echo ""
+ echo "prometheus:"
+ echo " prometheusSpec:"
+ echo " additionalArgs:"
+ echo " - name: web.enable-admin-api"
+ echo " value: \"\""
+ echo ""
+ echo "Then upgrade Prometheus:"
+ echo " cd /home/paul/git/conf/f3s/prometheus"
+ echo " just upgrade"
+ echo ""
+ echo "WARNING: Admin API should only be enabled in development/test environments!"
+ echo ""
+ echo "Alternative: Delete benchmark data files manually:"
+ echo " kubectl exec -n monitoring prometheus-prometheus-kube-prometheus-prometheus-0 -- sh -c 'rm -rf /prometheus/data/wal/*'"
+ echo " kubectl delete pod -n monitoring prometheus-prometheus-kube-prometheus-prometheus-0"
+else
+ echo "⚠️ Unexpected response: HTTP $ADMIN_CHECK"
+fi
+
+echo ""
+echo "Cleaning up port-forward..."
+kill $PF_PID 2>/dev/null || true
+
+echo ""
+echo "Done!"
diff --git a/generate-test-data.sh b/generate-test-data.sh
new file mode 100755
index 0000000..a4a0b1b
--- /dev/null
+++ b/generate-test-data.sh
@@ -0,0 +1,46 @@
+#!/bin/bash
+
+# Generate test data with actual timestamps for different time ranges
+
+NOW=$(date +%s)000 # Current time in milliseconds
+ONE_HOUR_AGO=$((NOW - 3600000))
+ONE_DAY_AGO=$((NOW - 86400000))
+ONE_WEEK_AGO=$((NOW - 604800000))
+ONE_MONTH_AGO=$((NOW - 2592000000))
+
+cat > test-all-ages.csv << EOF
+# Prometheus metrics in CSV format demonstrating all time ranges
+# Format: metric_name,labels,value,timestamp_ms
+
+# CURRENT data (< 5min old - will use Pushgateway/Realtime)
+app_requests_total,instance=current;env=prod,100,$NOW
+app_temperature_celsius,instance=current;zone=us-east,22.5,$NOW
+app_active_connections,instance=current;env=prod,50,$NOW
+
+# 1 HOUR OLD data (will use Remote Write/Historic)
+app_requests_total,instance=1h_ago;env=prod,95,$ONE_HOUR_AGO
+app_active_connections,instance=1h_ago;env=prod,45,$ONE_HOUR_AGO
+app_temperature_celsius,instance=1h_ago;zone=us-east,21.8,$ONE_HOUR_AGO
+
+# 1 DAY OLD data (will use Remote Write/Historic)
+app_requests_total,instance=1d_ago;env=prod,150,$ONE_DAY_AGO
+app_temperature_celsius,instance=1d_ago;zone=eu-west,18.3,$ONE_DAY_AGO
+app_active_connections,instance=1d_ago;env=prod,60,$ONE_DAY_AGO
+
+# 1 WEEK OLD data (will use Remote Write/Historic)
+app_requests_total,instance=1w_ago;env=prod,200,$ONE_WEEK_AGO
+app_jobs_processed_total,instance=1w_ago;env=prod;job_type=email;status=success,75,$ONE_WEEK_AGO
+app_temperature_celsius,instance=1w_ago;zone=asia,25.2,$ONE_WEEK_AGO
+
+# 1 MONTH OLD data (will use Remote Write/Historic)
+app_requests_total,instance=1m_ago;env=prod,180,$ONE_MONTH_AGO
+app_active_connections,instance=1m_ago;env=prod,30,$ONE_MONTH_AGO
+app_temperature_celsius,instance=1m_ago;zone=africa,28.7,$ONE_MONTH_AGO
+EOF
+
+echo "Generated test-all-ages.csv with the following timestamps:"
+echo " Current: $NOW ($(date -d @$((NOW/1000)) '+%Y-%m-%d %H:%M:%S'))"
+echo " 1h ago: $ONE_HOUR_AGO ($(date -d @$((ONE_HOUR_AGO/1000)) '+%Y-%m-%d %H:%M:%S'))"
+echo " 1d ago: $ONE_DAY_AGO ($(date -d @$((ONE_DAY_AGO/1000)) '+%Y-%m-%d %H:%M:%S'))"
+echo " 1w ago: $ONE_WEEK_AGO ($(date -d @$((ONE_WEEK_AGO/1000)) '+%Y-%m-%d %H:%M:%S'))"
+echo " 1m ago: $ONE_MONTH_AGO ($(date -d @$((ONE_MONTH_AGO/1000)) '+%Y-%m-%d %H:%M:%S'))"
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..1d34c98
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,26 @@
+module epimetheus
+
+go 1.24.0
+
+require (
+ github.com/golang/snappy v1.0.0
+ github.com/magefile/mage v1.15.0
+ github.com/prometheus/client_golang v1.23.2
+ github.com/prometheus/prometheus v0.308.1
+)
+
+require (
+ github.com/beorn7/perks v1.0.1 // indirect
+ github.com/cespare/xxhash/v2 v2.3.0 // indirect
+ github.com/gogo/protobuf v1.3.2 // indirect
+ github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853 // indirect
+ github.com/kr/text v0.2.0 // indirect
+ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
+ github.com/prometheus/client_model v0.6.2 // indirect
+ github.com/prometheus/common v0.67.4 // indirect
+ github.com/prometheus/procfs v0.16.1 // indirect
+ go.yaml.in/yaml/v2 v2.4.3 // indirect
+ golang.org/x/sys v0.37.0 // indirect
+ golang.org/x/text v0.30.0 // indirect
+ google.golang.org/protobuf v1.36.10 // indirect
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..fcafa41
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,83 @@
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
+github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
+github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
+github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
+github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853 h1:cLN4IBkmkYZNnk7EAJ0BHIethd+J6LqxFNw5mSiI2bM=
+github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk=
+github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
+github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
+github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
+github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
+github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
+github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
+github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
+github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
+github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
+github.com/prometheus/common v0.67.4 h1:yR3NqWO1/UyO1w2PhUvXlGQs/PtFmoveVO0KZ4+Lvsc=
+github.com/prometheus/common v0.67.4/go.mod h1:gP0fq6YjjNCLssJCQp0yk4M8W6ikLURwkdd/YKtTbyI=
+github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
+github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
+github.com/prometheus/prometheus v0.308.1 h1:ApMNI/3/es3Ze90Z7CMb+wwU2BsSYur0m5VKeqHj7h4=
+github.com/prometheus/prometheus v0.308.1/go.mod h1:aHjYCDz9zKRyoUXvMWvu13K9XHOkBB12XrEqibs3e0A=
+github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
+github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
+github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
+github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
+github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
+go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
+go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0=
+go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
+golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
+golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
+google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/internal/config/config.go b/internal/config/config.go
new file mode 100644
index 0000000..dea804b
--- /dev/null
+++ b/internal/config/config.go
@@ -0,0 +1,57 @@
+package config
+
+import "time"
+
+// Mode represents the ingestion mode
+type Mode string
+
+const (
+ ModeRealtime Mode = "realtime"
+ ModeHistoric Mode = "historic"
+ ModeBackfill Mode = "backfill"
+ ModeAuto Mode = "auto"
+ ModeWatch Mode = "watch"
+)
+
+// Config holds all configuration for the prometheus-pusher
+type Config struct {
+ Mode Mode
+ PushgatewayURL string
+ PrometheusURL string
+ ClickHouseURL string // ClickHouse HTTP URL (e.g. http://localhost:8123)
+ ClickHouseTable string // ClickHouse table name (default: epimetheus_metrics)
+ JobName string
+ Continuous bool
+ InputFile string
+ InputFormat string
+ MetricName string
+ HoursAgo int
+ StartHours int
+ EndHours int
+ Interval int
+ ResolveIPLabels []string // Labels containing IP addresses to resolve via DNS
+}
+
+// NewConfig creates a new Config with default values
+func NewConfig() Config {
+ return Config{
+ Mode: ModeRealtime,
+ PushgatewayURL: "http://localhost:9091",
+ PrometheusURL: "http://localhost:9090/api/v1/write",
+ ClickHouseURL: "",
+ ClickHouseTable: "epimetheus_metrics",
+ JobName: "example_metrics_pusher",
+ InputFormat: "csv",
+ HoursAgo: 24,
+ StartHours: 48,
+ EndHours: 0,
+ Interval: 1,
+ ResolveIPLabels: []string{"ip"}, // Default resolves 'ip' label
+ }
+}
+
+// AutoIngestThreshold is the age threshold for auto mode routing
+const AutoIngestThreshold = 5 * time.Minute
+
+// DefaultHTTPTimeout is the default timeout for HTTP requests
+const DefaultHTTPTimeout = 10 * time.Second
diff --git a/internal/config/config_test.go b/internal/config/config_test.go
new file mode 100644
index 0000000..da073c4
--- /dev/null
+++ b/internal/config/config_test.go
@@ -0,0 +1,52 @@
+package config
+
+import (
+ "testing"
+ "time"
+)
+
+func TestNewConfig(t *testing.T) {
+ cfg := NewConfig()
+
+ if cfg.Mode != ModeRealtime {
+ t.Errorf("Default mode = %v, want %v", cfg.Mode, ModeRealtime)
+ }
+ if cfg.PushgatewayURL != "http://localhost:9091" {
+ t.Errorf("Default PushgatewayURL = %v, want http://localhost:9091", cfg.PushgatewayURL)
+ }
+ if cfg.PrometheusURL != "http://localhost:9090/api/v1/write" {
+ t.Errorf("Default PrometheusURL = %v, want http://localhost:9090/api/v1/write", cfg.PrometheusURL)
+ }
+ if cfg.JobName != "example_metrics_pusher" {
+ t.Errorf("Default JobName = %v, want example_metrics_pusher", cfg.JobName)
+ }
+ if cfg.InputFormat != "csv" {
+ t.Errorf("Default InputFormat = %v, want csv", cfg.InputFormat)
+ }
+ if cfg.HoursAgo != 24 {
+ t.Errorf("Default HoursAgo = %v, want 24", cfg.HoursAgo)
+ }
+ if cfg.Interval != 1 {
+ t.Errorf("Default Interval = %v, want 1", cfg.Interval)
+ }
+}
+
+func TestModeConstants(t *testing.T) {
+ modes := []Mode{ModeRealtime, ModeHistoric, ModeBackfill, ModeAuto}
+ expected := []string{"realtime", "historic", "backfill", "auto"}
+
+ for i, mode := range modes {
+ if string(mode) != expected[i] {
+ t.Errorf("Mode constant %d = %v, want %v", i, mode, expected[i])
+ }
+ }
+}
+
+func TestConstants(t *testing.T) {
+ if AutoIngestThreshold != 5*time.Minute {
+ t.Errorf("AutoIngestThreshold = %v, want 5m", AutoIngestThreshold)
+ }
+ if DefaultHTTPTimeout != 10*time.Second {
+ t.Errorf("DefaultHTTPTimeout = %v, want 10s", DefaultHTTPTimeout)
+ }
+}
diff --git a/internal/ingester/auto.go b/internal/ingester/auto.go
new file mode 100644
index 0000000..315d767
--- /dev/null
+++ b/internal/ingester/auto.go
@@ -0,0 +1,145 @@
+package ingester
+
+import (
+ "context"
+ "fmt"
+ "log"
+ "time"
+
+ "epimetheus/internal/config"
+ "epimetheus/internal/metrics"
+)
+
+const ageThreshold = 5 * time.Minute
+
+// DetermineMode automatically determines which ingestion mode to use based on timestamp age.
+// Data older than 5 minutes uses historic mode (Remote Write), newer data uses realtime mode (Pushgateway).
+func DetermineMode(timestamp time.Time) config.Mode {
+ age := time.Since(timestamp)
+ if age > ageThreshold {
+ return config.ModeHistoric
+ }
+ return config.ModeRealtime
+}
+
+// AutoIngester handles automatic ingestion by routing samples to appropriate ingesters.
+type AutoIngester struct {
+ pushgateway PushgatewayIngester
+ remoteWrite RemoteWriteIngester
+ collectors metrics.Collectors
+}
+
+// NewAutoIngester creates a new auto ingester.
+func NewAutoIngester(collectors metrics.Collectors) AutoIngester {
+ return AutoIngester{
+ pushgateway: NewPushgatewayIngester(),
+ remoteWrite: NewRemoteWriteIngester(),
+ collectors: collectors,
+ }
+}
+
+// Ingest automatically routes samples to appropriate ingestion method based on timestamp age.
+func (a AutoIngester) Ingest(ctx context.Context, samples []metrics.Sample, cfg config.Config) error {
+ if len(samples) == 0 {
+ return fmt.Errorf("no samples to ingest")
+ }
+
+ realtimeSamples, historicSamples := groupSamplesByMode(samples)
+
+ logIngestSummary(len(samples), len(realtimeSamples), len(historicSamples))
+
+ if len(realtimeSamples) > 0 {
+ if err := a.ingestRealtime(ctx, cfg); err != nil {
+ return fmt.Errorf("failed to ingest realtime samples: %w", err)
+ }
+ }
+
+ if len(historicSamples) > 0 {
+ if err := a.ingestHistoric(ctx, historicSamples, cfg); err != nil {
+ return fmt.Errorf("failed to ingest historic samples: %w", err)
+ }
+ }
+
+ log.Printf("\n🎉 Auto-ingest complete!")
+ return nil
+}
+
+// groupSamplesByMode separates samples into realtime and historic groups.
+func groupSamplesByMode(samples []metrics.Sample) (realtime, historic []metrics.Sample) {
+ realtimeSamples := make([]metrics.Sample, 0)
+ historicSamples := make([]metrics.Sample, 0)
+
+ for _, sample := range samples {
+ if DetermineMode(sample.Timestamp) == config.ModeRealtime {
+ realtimeSamples = append(realtimeSamples, sample)
+ } else {
+ historicSamples = append(historicSamples, sample)
+ }
+ }
+
+ return realtimeSamples, historicSamples
+}
+
+// logIngestSummary logs the ingestion summary.
+func logIngestSummary(total, realtime, historic int) {
+ log.Printf("📊 Auto-ingest summary:")
+ log.Printf(" Total samples: %d", total)
+ log.Printf(" Realtime samples (< 5min old): %d", realtime)
+ log.Printf(" Historic samples (> 5min old): %d", historic)
+}
+
+// ingestRealtime ingests realtime samples via Pushgateway.
+func (a AutoIngester) ingestRealtime(ctx context.Context, cfg config.Config) error {
+ log.Printf("\n🔄 Ingesting REALTIME samples via Pushgateway...")
+ log.Printf(" Note: Pushgateway uses current timestamp (original timestamps ignored)")
+
+ if err := a.pushgateway.Ingest(ctx, a.collectors, cfg.PushgatewayURL, cfg.JobName); err != nil {
+ return err
+ }
+
+ log.Printf("✅ Successfully ingested realtime samples")
+ return nil
+}
+
+// ingestHistoric ingests historic samples via Remote Write.
+func (a AutoIngester) ingestHistoric(ctx context.Context, samples []metrics.Sample, cfg config.Config) error {
+ log.Printf("\n⏰ Ingesting %d HISTORIC samples via Remote Write...", len(samples))
+
+ // Log a few sample details instead of all samples
+ samplesToLog := 3
+ if len(samples) < samplesToLog {
+ samplesToLog = len(samples)
+ }
+
+ for i := 0; i < samplesToLog; i++ {
+ age := time.Since(samples[i].Timestamp)
+ log.Printf(" Sample %d: %s (age: %s)", i+1, samples[i].MetricName, formatDuration(age))
+ }
+
+ if len(samples) > samplesToLog {
+ // Show oldest and newest sample ages
+ oldestAge := time.Since(samples[0].Timestamp)
+ newestAge := time.Since(samples[len(samples)-1].Timestamp)
+ log.Printf(" ... (%d more samples)", len(samples)-samplesToLog)
+ log.Printf(" Age range: %s (oldest) to %s (newest)", formatDuration(oldestAge), formatDuration(newestAge))
+ }
+
+ if err := a.remoteWrite.Ingest(ctx, samples, cfg.PrometheusURL); err != nil {
+ return err
+ }
+
+ log.Printf("✅ Successfully ingested %d historic samples", len(samples))
+ return nil
+}
+
+// formatDuration formats a duration in human-readable form.
+func formatDuration(d time.Duration) string {
+ if d < time.Minute {
+ return fmt.Sprintf("%.0f seconds", d.Seconds())
+ } else if d < time.Hour {
+ return fmt.Sprintf("%.0f minutes", d.Minutes())
+ } else if d < 24*time.Hour {
+ return fmt.Sprintf("%.1f hours", d.Hours())
+ }
+ return fmt.Sprintf("%.1f days", d.Hours()/24)
+}
diff --git a/internal/ingester/auto_test.go b/internal/ingester/auto_test.go
new file mode 100644
index 0000000..fc1c423
--- /dev/null
+++ b/internal/ingester/auto_test.go
@@ -0,0 +1,164 @@
+package ingester
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "epimetheus/internal/config"
+ "epimetheus/internal/metrics"
+)
+
+func TestDetermineMode(t *testing.T) {
+ tests := []struct {
+ name string
+ timestamp time.Time
+ want config.Mode
+ }{
+ {
+ name: "current time is realtime",
+ timestamp: time.Now(),
+ want: config.ModeRealtime,
+ },
+ {
+ name: "1 minute ago is realtime",
+ timestamp: time.Now().Add(-1 * time.Minute),
+ want: config.ModeRealtime,
+ },
+ {
+ name: "4 minutes ago is realtime",
+ timestamp: time.Now().Add(-4 * time.Minute),
+ want: config.ModeRealtime,
+ },
+ {
+ name: "6 minutes ago is historic",
+ timestamp: time.Now().Add(-6 * time.Minute),
+ want: config.ModeHistoric,
+ },
+ {
+ name: "1 hour ago is historic",
+ timestamp: time.Now().Add(-1 * time.Hour),
+ want: config.ModeHistoric,
+ },
+ {
+ name: "1 day ago is historic",
+ timestamp: time.Now().Add(-24 * time.Hour),
+ want: config.ModeHistoric,
+ },
+ {
+ name: "exactly 5 minutes is historic (edge case)",
+ timestamp: time.Now().Add(-5 * time.Minute),
+ want: config.ModeHistoric,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := DetermineMode(tt.timestamp)
+ if got != tt.want {
+ age := time.Since(tt.timestamp)
+ t.Errorf("DetermineMode() = %v, want %v (age: %v)", got, tt.want, age)
+ }
+ })
+ }
+}
+
+func TestGroupSamplesByMode(t *testing.T) {
+ now := time.Now()
+ samples := []metrics.Sample{
+ {MetricName: "metric1", Timestamp: now.Add(-1 * time.Minute)}, // realtime
+ {MetricName: "metric2", Timestamp: now.Add(-2 * time.Minute)}, // realtime
+ {MetricName: "metric3", Timestamp: now.Add(-10 * time.Minute)}, // historic
+ {MetricName: "metric4", Timestamp: now.Add(-1 * time.Hour)}, // historic
+ {MetricName: "metric5", Timestamp: now.Add(-30 * time.Second)}, // realtime
+ }
+
+ realtime, historic := groupSamplesByMode(samples)
+
+ if len(realtime) != 3 {
+ t.Errorf("Got %d realtime samples, want 3", len(realtime))
+ }
+ if len(historic) != 2 {
+ t.Errorf("Got %d historic samples, want 2", len(historic))
+ }
+
+ // Verify correct grouping
+ for _, s := range realtime {
+ if DetermineMode(s.Timestamp) != config.ModeRealtime {
+ t.Errorf("Sample %s incorrectly grouped as realtime (age: %v)", s.MetricName, s.Age())
+ }
+ }
+ for _, s := range historic {
+ if DetermineMode(s.Timestamp) != config.ModeHistoric {
+ t.Errorf("Sample %s incorrectly grouped as historic (age: %v)", s.MetricName, s.Age())
+ }
+ }
+}
+
+func TestFormatDuration(t *testing.T) {
+ tests := []struct {
+ name string
+ duration time.Duration
+ want string
+ }{
+ {
+ name: "seconds",
+ duration: 45 * time.Second,
+ want: "45 seconds",
+ },
+ {
+ name: "minutes",
+ duration: 5 * time.Minute,
+ want: "5 minutes",
+ },
+ {
+ name: "hours",
+ duration: 2*time.Hour + 30*time.Minute,
+ want: "2.5 hours",
+ },
+ {
+ name: "days",
+ duration: 36 * time.Hour,
+ want: "1.5 days",
+ },
+ {
+ name: "less than minute",
+ duration: 30 * time.Second,
+ want: "30 seconds",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := formatDuration(tt.duration)
+ if got != tt.want {
+ t.Errorf("formatDuration() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestAutoIngester_Ingest_EmptySamples(t *testing.T) {
+ collectors := metrics.NewCollectors()
+ autoIngester := NewAutoIngester(collectors)
+ ctx := context.Background()
+ cfg := config.NewConfig()
+
+ err := autoIngester.Ingest(ctx, []metrics.Sample{}, cfg)
+ if err == nil {
+ t.Error("Expected error for empty samples, got nil")
+ }
+ if err.Error() != "no samples to ingest" {
+ t.Errorf("Expected 'no samples to ingest' error, got: %v", err)
+ }
+}
+
+func TestAutoIngester_New(t *testing.T) {
+ collectors := metrics.NewCollectors()
+ ingester := NewAutoIngester(collectors)
+
+ // Verify ingester was created with components
+ if ingester.collectors.RequestsTotal == nil {
+ t.Error("AutoIngester.collectors not initialized properly")
+ }
+}
diff --git a/internal/ingester/clickhouse.go b/internal/ingester/clickhouse.go
new file mode 100644
index 0000000..2439d4e
--- /dev/null
+++ b/internal/ingester/clickhouse.go
@@ -0,0 +1,191 @@
+package ingester
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "net/url"
+ "strings"
+ "sync"
+ "time"
+
+ "epimetheus/internal/metrics"
+)
+
+const (
+ clickhouseBatchSize = 10000
+ clickhouseHTTPTimeout = 30 * time.Second
+ defaultTableName = "epimetheus_metrics"
+)
+
+// ClickHouseIngester ingests metrics into ClickHouse.
+type ClickHouseIngester struct {
+ client *http.Client
+ baseURL string
+ tableName string
+}
+
+// NewClickHouseIngester creates a new ClickHouse ingester.
+func NewClickHouseIngester(baseURL, tableName string) ClickHouseIngester {
+ if tableName == "" {
+ tableName = defaultTableName
+ }
+ // Ensure URL has scheme and no trailing slash for query params
+ baseURL = strings.TrimSuffix(strings.TrimSpace(baseURL), "/")
+
+ return ClickHouseIngester{
+ client: &http.Client{
+ Timeout: clickhouseHTTPTimeout,
+ },
+ baseURL: baseURL,
+ tableName: tableName,
+ }
+}
+
+// EnsureTable creates the metrics table if it does not exist.
+func (c ClickHouseIngester) EnsureTable(ctx context.Context) error {
+ createSQL := fmt.Sprintf(`
+ CREATE TABLE IF NOT EXISTS %s (
+ metric String,
+ labels Map(String, String),
+ value Float64,
+ timestamp DateTime64(3)
+ ) ENGINE = MergeTree()
+ ORDER BY (metric, timestamp)
+ `, c.tableName)
+
+ reqURL := c.baseURL + "/?query=" + url.QueryEscape(createSQL)
+ req, err := http.NewRequestWithContext(ctx, "POST", reqURL, nil)
+ if err != nil {
+ return fmt.Errorf("create request: %w", err)
+ }
+
+ resp, err := c.client.Do(req)
+ if err != nil {
+ return fmt.Errorf("execute create table: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("create table failed (status %d): %s", resp.StatusCode, string(body))
+ }
+
+ return nil
+}
+
+// Ingest inserts samples into ClickHouse in batches.
+func (c ClickHouseIngester) Ingest(ctx context.Context, samples []metrics.Sample) error {
+ if len(samples) == 0 {
+ return fmt.Errorf("no samples to ingest")
+ }
+
+ if err := c.EnsureTable(ctx); err != nil {
+ return fmt.Errorf("ensure table: %w", err)
+ }
+
+ batches := chunkSamplesForClickHouse(samples, clickhouseBatchSize)
+ totalBatches := len(batches)
+
+ log.Printf("ClickHouse: ingesting %d samples in %d batches", len(samples), totalBatches)
+
+ var wg sync.WaitGroup
+ errChan := make(chan error, totalBatches)
+
+ for i, batch := range batches {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ wg.Add(1)
+ go func(idx int, b []metrics.Sample) {
+ defer wg.Done()
+ if err := c.insertBatch(ctx, b); err != nil {
+ errChan <- fmt.Errorf("batch %d: %w", idx+1, err)
+ }
+ }(i, batch)
+ }
+
+ wg.Wait()
+ close(errChan)
+
+ var firstErr error
+ for err := range errChan {
+ if firstErr == nil {
+ firstErr = err
+ }
+ log.Printf("ClickHouse batch error: %v", err)
+ }
+
+ return firstErr
+}
+
+// insertBatch sends a single batch via HTTP JSONEachRow.
+func (c ClickHouseIngester) insertBatch(ctx context.Context, samples []metrics.Sample) error {
+ var buf bytes.Buffer
+ enc := json.NewEncoder(&buf)
+
+ for _, s := range samples {
+ // Convert labels map to format ClickHouse Map expects
+ labelsMap := make(map[string]string, len(s.Labels))
+ for k, v := range s.Labels {
+ labelsMap[k] = v
+ }
+
+ row := struct {
+ Metric string `json:"metric"`
+ Labels map[string]string `json:"labels"`
+ Value float64 `json:"value"`
+ Timestamp string `json:"timestamp"`
+ }{
+ Metric: s.MetricName,
+ Labels: labelsMap,
+ Value: s.Value,
+ Timestamp: s.Timestamp.UTC().Format("2006-01-02 15:04:05.000"),
+ }
+
+ if err := enc.Encode(row); err != nil {
+ return fmt.Errorf("encode row: %w", err)
+ }
+ }
+
+ query := fmt.Sprintf("INSERT INTO %s FORMAT JSONEachRow", c.tableName)
+ reqURL := c.baseURL + "/?query=" + url.QueryEscape(query)
+
+ req, err := http.NewRequestWithContext(ctx, "POST", reqURL, &buf)
+ if err != nil {
+ return fmt.Errorf("create request: %w", err)
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ resp, err := c.client.Do(req)
+ if err != nil {
+ return fmt.Errorf("send request: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("insert failed (status %d): %s", resp.StatusCode, string(body))
+ }
+
+ return nil
+}
+
+func chunkSamplesForClickHouse(samples []metrics.Sample, size int) [][]metrics.Sample {
+ var batches [][]metrics.Sample
+ for i := 0; i < len(samples); i += size {
+ end := i + size
+ if end > len(samples) {
+ end = len(samples)
+ }
+ batches = append(batches, samples[i:end])
+ }
+ return batches
+}
diff --git a/internal/ingester/pushgateway.go b/internal/ingester/pushgateway.go
new file mode 100644
index 0000000..c5c80c3
--- /dev/null
+++ b/internal/ingester/pushgateway.go
@@ -0,0 +1,51 @@
+package ingester
+
+import (
+ "context"
+ "fmt"
+
+ "epimetheus/internal/metrics"
+
+ "github.com/prometheus/client_golang/prometheus/push"
+)
+
+// PushgatewayIngester handles realtime metric ingestion via Pushgateway.
+// Note: Pushgateway does not preserve custom timestamps - all metrics are
+// timestamped with the current time when pushed.
+type PushgatewayIngester struct{}
+
+// NewPushgatewayIngester creates a new Pushgateway ingester.
+func NewPushgatewayIngester() PushgatewayIngester {
+ return PushgatewayIngester{}
+}
+
+// Ingest pushes metrics to Pushgateway.
+// The samples parameter is currently ignored because Pushgateway doesn't support
+// custom metric values from samples - it uses registered Prometheus collectors.
+// This ingests generated metrics using the provided collectors.
+func (i PushgatewayIngester) Ingest(ctx context.Context, collectors metrics.Collectors, url, jobName string) error {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ // Generate random metric values
+ collectors.Simulate()
+
+ // Create pusher with all collectors
+ pusher := push.New(url, jobName).
+ Collector(collectors.RequestsTotal).
+ Collector(collectors.ActiveConnections).
+ Collector(collectors.TemperatureCelsius).
+ Collector(collectors.RequestDuration).
+ Collector(collectors.JobsProcessed).
+ Grouping("instance", "example-app")
+
+ // Push metrics to Pushgateway
+ if err := pusher.Push(); err != nil {
+ return fmt.Errorf("failed to push to pushgateway: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/ingester/pushgateway_test.go b/internal/ingester/pushgateway_test.go
new file mode 100644
index 0000000..55fc213
--- /dev/null
+++ b/internal/ingester/pushgateway_test.go
@@ -0,0 +1,28 @@
+package ingester
+
+import (
+ "testing"
+
+ "epimetheus/internal/metrics"
+)
+
+func TestNewPushgatewayIngester(t *testing.T) {
+ ingester := NewPushgatewayIngester()
+
+ // Verify the ingester was created (value type, so no nil check needed)
+ _ = ingester
+}
+
+func TestPushgatewayIngester_Type(t *testing.T) {
+ // Test that we can create and use the ingester
+ collectors := metrics.NewCollectors()
+ ingester := NewPushgatewayIngester()
+
+ // The ingester should work with collectors
+ if collectors.RequestsTotal == nil {
+ t.Error("Collectors not initialized properly")
+ }
+
+ // Verify ingester is the correct type
+ _ = ingester
+}
diff --git a/internal/ingester/remotewrite.go b/internal/ingester/remotewrite.go
new file mode 100644
index 0000000..b88b7fa
--- /dev/null
+++ b/internal/ingester/remotewrite.go
@@ -0,0 +1,455 @@
+package ingester
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "log"
+ "math/rand"
+ "net/http"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "epimetheus/internal/metrics"
+
+ "github.com/golang/snappy"
+ "github.com/prometheus/prometheus/prompb"
+)
+
+const (
+ requestTimeout = 10 * time.Second
+ backfillDelay = 100 * time.Millisecond
+ // BatchSize defines the maximum number of samples per Remote Write request
+ // Prometheus has a 32MB limit, so we keep batches small to avoid rejection
+ BatchSize = 5000
+ // NumWorkers defines the number of concurrent goroutines for batch processing
+ // Higher values increase throughput but may overwhelm Prometheus
+ NumWorkers = 10
+)
+
+// Buffer pool for reusing buffers across requests (reduces GC pressure)
+var bufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+
+// Preallocated buffer pool for protobuf marshaling
+var protoBufferPool = sync.Pool{
+ New: func() interface{} {
+ // Preallocate ~500KB buffer (typical batch size)
+ buf := make([]byte, 0, 512*1024)
+ return &buf
+ },
+}
+
+// TimeSeries object pool for reuse
+var timeSeriesPool = sync.Pool{
+ New: func() interface{} {
+ return &prompb.TimeSeries{
+ Labels: make([]prompb.Label, 0, 10), // Typical label count
+ Samples: make([]prompb.Sample, 0, 1),
+ }
+ },
+}
+
+// Label slice pool for reuse
+var labelSlicePool = sync.Pool{
+ New: func() interface{} {
+ labels := make([]prompb.Label, 0, 10)
+ return &labels
+ },
+}
+
+// RemoteWriteIngester handles historic metric ingestion via Prometheus Remote Write API.
+// This ingester preserves custom timestamps, making it suitable for importing historic data.
+type RemoteWriteIngester struct {
+ client *http.Client
+}
+
+// NewRemoteWriteIngester creates a new Remote Write ingester with optimized HTTP client.
+func NewRemoteWriteIngester() RemoteWriteIngester {
+ // Optimized HTTP transport with connection pooling
+ transport := &http.Transport{
+ MaxIdleConns: 100, // Global connection pool
+ MaxIdleConnsPerHost: NumWorkers, // Match worker count
+ MaxConnsPerHost: NumWorkers, // Limit concurrent connections per host
+ IdleConnTimeout: 90 * time.Second, // Keep connections alive longer
+ DisableKeepAlives: false, // CRITICAL: enable keep-alive
+ DisableCompression: true, // We compress manually with Snappy
+ ForceAttemptHTTP2: true, // Use HTTP/2 if available
+ WriteBufferSize: 64 * 1024, // 64KB write buffer
+ ReadBufferSize: 64 * 1024, // 64KB read buffer
+ }
+
+ return RemoteWriteIngester{
+ client: &http.Client{
+ Timeout: requestTimeout,
+ Transport: transport,
+ },
+ }
+}
+
+// Ingest sends samples to Prometheus via Remote Write API with batching and concurrency.
+// Large datasets are automatically split into batches and processed by a worker pool
+// to avoid exceeding Prometheus's 32MB limit and maximize throughput.
+func (i RemoteWriteIngester) Ingest(ctx context.Context, samples []metrics.Sample, url string) error {
+ if len(samples) == 0 {
+ return fmt.Errorf("no samples to ingest")
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ // Split samples into batches
+ batches := chunkSamples(samples, BatchSize)
+ totalBatches := len(batches)
+
+ log.Printf("Splitting %d samples into %d batches (batch size: %d)", len(samples), totalBatches, BatchSize)
+ log.Printf("Processing batches with %d concurrent workers", NumWorkers)
+
+ // Counters for tracking progress (atomic for thread safety)
+ var successCount int32
+ var errorCount int32
+ var processedCount int32
+
+ // Worker pool pattern
+ batchChan := make(chan batchJob, totalBatches)
+ errorsChan := make(chan error, totalBatches)
+ var wg sync.WaitGroup
+
+ // Start workers
+ for w := 0; w < NumWorkers; w++ {
+ wg.Add(1)
+ go func(workerID int) {
+ defer wg.Done()
+
+ for job := range batchChan {
+ // Check context before processing
+ select {
+ case <-ctx.Done():
+ errorsChan <- fmt.Errorf("worker %d cancelled", workerID)
+ return
+ default:
+ }
+
+ // Convert and send batch
+ timeSeries := convertSamplesToTimeSeries(job.batch)
+ writeRequest := &prompb.WriteRequest{Timeseries: timeSeries}
+
+ if err := i.sendWriteRequest(ctx, url, writeRequest); err != nil {
+ atomic.AddInt32(&errorCount, 1)
+ errorsChan <- fmt.Errorf("batch %d: %w", job.index, err)
+ } else {
+ atomic.AddInt32(&successCount, 1)
+ }
+
+ // Update progress
+ processed := atomic.AddInt32(&processedCount, 1)
+ if processed%10 == 0 || int(processed) == totalBatches {
+ progress := float64(processed) / float64(totalBatches) * 100
+ log.Printf("Progress: %.1f%% (%d/%d batches, %d success, %d errors)",
+ progress, processed, totalBatches, atomic.LoadInt32(&successCount), atomic.LoadInt32(&errorCount))
+ }
+ }
+ }(w)
+ }
+
+ // Send batches to workers
+ for idx, batch := range batches {
+ batchChan <- batchJob{index: idx + 1, batch: batch}
+ }
+ close(batchChan)
+
+ // Wait for all workers to finish
+ wg.Wait()
+ close(errorsChan)
+
+ // Collect errors
+ var firstError error
+ errorList := make([]error, 0)
+ for err := range errorsChan {
+ errorList = append(errorList, err)
+ if firstError == nil {
+ firstError = err
+ }
+ }
+
+ finalSuccess := int(atomic.LoadInt32(&successCount))
+ finalErrors := int(atomic.LoadInt32(&errorCount))
+
+ log.Printf("Batch ingestion complete: %d successful, %d errors", finalSuccess, finalErrors)
+
+ if finalErrors > 0 {
+ // Log first few errors as examples
+ numToLog := 5
+ if len(errorList) < numToLog {
+ numToLog = len(errorList)
+ }
+ log.Printf("Sample errors (showing %d of %d):", numToLog, finalErrors)
+ for i := 0; i < numToLog; i++ {
+ log.Printf(" - %v", errorList[i])
+ }
+ return fmt.Errorf("completed with %d/%d batches failed", finalErrors, totalBatches)
+ }
+
+ return nil
+}
+
+// batchJob represents a batch to be processed by a worker.
+type batchJob struct {
+ index int
+ batch []metrics.Sample
+}
+
+// chunkSamples splits samples into batches of the specified size.
+func chunkSamples(samples []metrics.Sample, batchSize int) [][]metrics.Sample {
+ var batches [][]metrics.Sample
+
+ for i := 0; i < len(samples); i += batchSize {
+ end := i + batchSize
+ if end > len(samples) {
+ end = len(samples)
+ }
+ batches = append(batches, samples[i:end])
+ }
+
+ return batches
+}
+
+// IngestHistoric generates and ingests historic metrics for a specific time in the past.
+func (i RemoteWriteIngester) IngestHistoric(ctx context.Context, url string, hoursAgo int) error {
+ timestamp := time.Now().Add(-time.Duration(hoursAgo) * time.Hour)
+ timeSeries := generateHistoricTimeSeries(timestamp)
+ writeRequest := &prompb.WriteRequest{Timeseries: timeSeries}
+
+ if err := i.sendWriteRequest(ctx, url, writeRequest); err != nil {
+ return err
+ }
+
+ log.Printf("Successfully pushed historic data for %d hours ago (timestamp: %s)",
+ hoursAgo, timestamp.Format(time.RFC3339))
+ return nil
+}
+
+// Backfill ingests historic metrics for a range of time points.
+func (i RemoteWriteIngester) Backfill(ctx context.Context, url string, startHoursAgo, endHoursAgo, intervalHours int) error {
+ log.Printf("Starting backfill from %d hours ago to %d hours ago (interval: %d hours)",
+ startHoursAgo, endHoursAgo, intervalHours)
+
+ successCount := 0
+ errorCount := 0
+
+ for hoursAgo := startHoursAgo; hoursAgo >= endHoursAgo; hoursAgo -= intervalHours {
+ if err := i.IngestHistoric(ctx, url, hoursAgo); err != nil {
+ log.Printf("Error pushing data for %d hours ago: %v", hoursAgo, err)
+ errorCount++
+ } else {
+ successCount++
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-time.After(backfillDelay):
+ }
+ }
+
+ log.Printf("Backfill complete: %d successful, %d errors", successCount, errorCount)
+
+ if errorCount > 0 {
+ return fmt.Errorf("backfill completed with %d errors", errorCount)
+ }
+
+ return nil
+}
+
+// sendWriteRequest sends a write request to Prometheus using pooled buffers.
+func (i RemoteWriteIngester) sendWriteRequest(ctx context.Context, url string, writeRequest *prompb.WriteRequest) error {
+ // Get protobuf buffer from pool
+ protoBufPtr := protoBufferPool.Get().(*[]byte)
+ protoBuf := (*protoBufPtr)[:0] // Reset length but keep capacity
+ defer protoBufferPool.Put(protoBufPtr)
+
+ // Marshal into pooled buffer
+ data, err := writeRequest.Marshal()
+ if err != nil {
+ return fmt.Errorf("failed to marshal write request: %w", err)
+ }
+
+ // Compress using pooled buffer
+ compressed := snappy.Encode(protoBuf, data)
+
+ // Get request buffer from pool
+ buf := bufferPool.Get().(*bytes.Buffer)
+ buf.Reset()
+ defer bufferPool.Put(buf)
+
+ buf.Write(compressed)
+
+ req, err := http.NewRequestWithContext(ctx, "POST", url, buf)
+ if err != nil {
+ return fmt.Errorf("failed to create HTTP request: %w", err)
+ }
+
+ req.Header.Set("Content-Type", "application/x-protobuf")
+ req.Header.Set("Content-Encoding", "snappy")
+ req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
+ req.Header.Set("Content-Length", fmt.Sprintf("%d", buf.Len()))
+
+ resp, err := i.client.Do(req)
+ if err != nil {
+ return fmt.Errorf("failed to send remote write request: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
+ body, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("remote write failed with status %d: %s", resp.StatusCode, string(body))
+ }
+
+ return nil
+}
+
+// convertSamplesToTimeSeries converts metrics.Sample to prompb.TimeSeries format with pooling.
+func convertSamplesToTimeSeries(samples []metrics.Sample) []prompb.TimeSeries {
+ // Preallocate with exact capacity to avoid reallocation
+ timeSeries := make([]prompb.TimeSeries, 0, len(samples))
+
+ // Reusable label slice
+ labelsPtr := labelSlicePool.Get().(*[]prompb.Label)
+ labels := *labelsPtr
+ defer labelSlicePool.Put(labelsPtr)
+
+ for i := range samples {
+ sample := &samples[i] // Avoid copying
+
+ // Reset labels slice for reuse
+ labels = labels[:0]
+
+ // Add __name__ label
+ labels = append(labels, prompb.Label{Name: "__name__", Value: sample.MetricName})
+
+ // Add custom labels
+ for k, v := range sample.Labels {
+ labels = append(labels, prompb.Label{Name: k, Value: v})
+ }
+
+ // Copy labels (must not share slice across time series)
+ labelsCopy := make([]prompb.Label, len(labels))
+ copy(labelsCopy, labels)
+
+ // Create time series (reuse pattern, but we need unique objects)
+ timeSeries = append(timeSeries, prompb.TimeSeries{
+ Labels: labelsCopy,
+ Samples: []prompb.Sample{{
+ Value: sample.Value,
+ Timestamp: sample.Timestamp.UnixMilli(),
+ }},
+ })
+ }
+
+ return timeSeries
+}
+
+// generateHistoricTimeSeries generates example time series for a specific timestamp.
+func generateHistoricTimeSeries(timestamp time.Time) []prompb.TimeSeries {
+ timestampMs := timestamp.UnixMilli()
+ var timeSeries []prompb.TimeSeries
+
+ baseLabels := []prompb.Label{
+ {Name: "instance", Value: "example-app"},
+ {Name: "job", Value: "historic_data"},
+ }
+
+ timeSeries = append(timeSeries, createCounterSeries("epimetheus_test_requests_total", baseLabels, float64(rand.Intn(100)+1), timestampMs))
+ timeSeries = append(timeSeries, createGaugeSeries("epimetheus_test_active_connections", baseLabels, float64(rand.Intn(100)), timestampMs))
+ timeSeries = append(timeSeries, createGaugeSeries("epimetheus_test_temperature_celsius", baseLabels, 15+rand.Float64()*20, timestampMs))
+
+ timeSeries = append(timeSeries, generateHistogramSeries(baseLabels, timestampMs)...)
+ timeSeries = append(timeSeries, generateLabeledCounterSeries(baseLabels, timestampMs)...)
+
+ return timeSeries
+}
+
+// createCounterSeries creates a counter metric time series.
+func createCounterSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries {
+ labels := []prompb.Label{{Name: "__name__", Value: name}}
+ labels = append(labels, baseLabels...)
+
+ return prompb.TimeSeries{
+ Labels: labels,
+ Samples: []prompb.Sample{{Value: value, Timestamp: timestamp}},
+ }
+}
+
+// createGaugeSeries creates a gauge metric time series.
+func createGaugeSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries {
+ return createCounterSeries(name, baseLabels, value, timestamp)
+}
+
+// generateHistogramSeries generates histogram bucket time series.
+func generateHistogramSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries {
+ buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}
+ var series []prompb.TimeSeries
+
+ cumulativeCount := 0
+ for _, bucket := range buckets {
+ cumulativeCount += rand.Intn(5)
+ labels := []prompb.Label{
+ {Name: "__name__", Value: "epimetheus_test_request_duration_seconds_bucket"},
+ {Name: "le", Value: fmt.Sprintf("%g", bucket)},
+ }
+ labels = append(labels, baseLabels...)
+
+ series = append(series, prompb.TimeSeries{
+ Labels: labels,
+ Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}},
+ })
+ }
+
+ infLabels := []prompb.Label{
+ {Name: "__name__", Value: "epimetheus_test_request_duration_seconds_bucket"},
+ {Name: "le", Value: "+Inf"},
+ }
+ infLabels = append(infLabels, baseLabels...)
+ series = append(series, prompb.TimeSeries{
+ Labels: infLabels,
+ Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}},
+ })
+
+ series = append(series, createCounterSeries("epimetheus_test_request_duration_seconds_sum", baseLabels, rand.Float64()*100, timestamp))
+ series = append(series, createCounterSeries("epimetheus_test_request_duration_seconds_count", baseLabels, float64(cumulativeCount), timestamp))
+
+ return series
+}
+
+// generateLabeledCounterSeries generates labeled counter time series.
+func generateLabeledCounterSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries {
+ jobTypes := []string{"email", "report", "backup"}
+ statuses := []string{"success", "failed"}
+ var series []prompb.TimeSeries
+
+ for _, jobType := range jobTypes {
+ for _, status := range statuses {
+ labels := []prompb.Label{
+ {Name: "__name__", Value: "epimetheus_test_jobs_processed_total"},
+ {Name: "job_type", Value: jobType},
+ {Name: "status", Value: status},
+ }
+ labels = append(labels, baseLabels...)
+
+ series = append(series, prompb.TimeSeries{
+ Labels: labels,
+ Samples: []prompb.Sample{{Value: float64(rand.Intn(20)), Timestamp: timestamp}},
+ })
+ }
+ }
+
+ return series
+}
diff --git a/internal/ingester/remotewrite_test.go b/internal/ingester/remotewrite_test.go
new file mode 100644
index 0000000..d1fdee6
--- /dev/null
+++ b/internal/ingester/remotewrite_test.go
@@ -0,0 +1,210 @@
+package ingester
+
+import (
+ "testing"
+ "time"
+
+ "epimetheus/internal/metrics"
+
+ "github.com/prometheus/prometheus/prompb"
+)
+
+func TestNewRemoteWriteIngester(t *testing.T) {
+ ingester := NewRemoteWriteIngester()
+ if ingester.client == nil {
+ t.Error("RemoteWriteIngester.client should not be nil")
+ }
+}
+
+func TestConvertSamplesToTimeSeries(t *testing.T) {
+ now := time.Now()
+ samples := []metrics.Sample{
+ {
+ MetricName: "test_metric1",
+ Labels: map[string]string{"env": "prod", "host": "server1"},
+ Value: 42.5,
+ Timestamp: now,
+ },
+ {
+ MetricName: "test_metric2",
+ Labels: map[string]string{"env": "test"},
+ Value: 100.0,
+ Timestamp: now.Add(-1 * time.Hour),
+ },
+ }
+
+ timeSeries := convertSamplesToTimeSeries(samples)
+
+ if len(timeSeries) != 2 {
+ t.Errorf("Expected 2 time series, got %d", len(timeSeries))
+ }
+
+ // Check first time series
+ ts1 := timeSeries[0]
+ if len(ts1.Labels) != 3 { // __name__ + 2 custom labels
+ t.Errorf("Expected 3 labels, got %d", len(ts1.Labels))
+ }
+
+ hasName := false
+ for _, label := range ts1.Labels {
+ if label.Name == "__name__" && label.Value == "test_metric1" {
+ hasName = true
+ }
+ }
+ if !hasName {
+ t.Error("Missing or incorrect __name__ label")
+ }
+
+ if len(ts1.Samples) != 1 {
+ t.Errorf("Expected 1 sample, got %d", len(ts1.Samples))
+ }
+ if ts1.Samples[0].Value != 42.5 {
+ t.Errorf("Expected value 42.5, got %f", ts1.Samples[0].Value)
+ }
+}
+
+func TestGenerateHistoricTimeSeries(t *testing.T) {
+ timestamp := time.Now().Add(-24 * time.Hour)
+
+ timeSeries := generateHistoricTimeSeries(timestamp)
+
+ if len(timeSeries) == 0 {
+ t.Error("Expected time series to be generated")
+ }
+
+ // Should contain various metric types
+ metricNames := make(map[string]bool)
+ for _, ts := range timeSeries {
+ for _, label := range ts.Labels {
+ if label.Name == "__name__" {
+ metricNames[label.Value] = true
+ }
+ }
+ }
+
+ expectedMetrics := []string{
+ "epimetheus_test_requests_total",
+ "epimetheus_test_active_connections",
+ "epimetheus_test_temperature_celsius",
+ "epimetheus_test_jobs_processed_total",
+ }
+
+ for _, expected := range expectedMetrics {
+ if !metricNames[expected] {
+ t.Errorf("Expected metric %s not found", expected)
+ }
+ }
+}
+
+func TestCreateCounterSeries(t *testing.T) {
+ baseLabels := []prompb.Label{
+ {Name: "instance", Value: "test-instance"},
+ {Name: "job", Value: "test-job"},
+ }
+
+ ts := createCounterSeries("test_counter", baseLabels, 123.45, 1234567890000)
+
+ if len(ts.Labels) != 3 { // __name__ + 2 base labels
+ t.Errorf("Expected 3 labels, got %d", len(ts.Labels))
+ }
+
+ if len(ts.Samples) != 1 {
+ t.Errorf("Expected 1 sample, got %d", len(ts.Samples))
+ }
+
+ if ts.Samples[0].Value != 123.45 {
+ t.Errorf("Expected value 123.45, got %f", ts.Samples[0].Value)
+ }
+
+ if ts.Samples[0].Timestamp != 1234567890000 {
+ t.Errorf("Expected timestamp 1234567890000, got %d", ts.Samples[0].Timestamp)
+ }
+}
+
+func TestCreateGaugeSeries(t *testing.T) {
+ baseLabels := []prompb.Label{
+ {Name: "instance", Value: "test-instance"},
+ }
+
+ ts := createGaugeSeries("test_gauge", baseLabels, 67.89, 9876543210000)
+
+ if len(ts.Samples) != 1 {
+ t.Errorf("Expected 1 sample, got %d", len(ts.Samples))
+ }
+
+ if ts.Samples[0].Value != 67.89 {
+ t.Errorf("Expected value 67.89, got %f", ts.Samples[0].Value)
+ }
+}
+
+func TestGenerateHistogramSeries(t *testing.T) {
+ baseLabels := []prompb.Label{
+ {Name: "instance", Value: "test-instance"},
+ }
+ timestamp := int64(1234567890000)
+
+ series := generateHistogramSeries(baseLabels, timestamp)
+
+ if len(series) == 0 {
+ t.Error("Expected histogram series to be generated")
+ }
+
+ // Should contain buckets, +Inf, sum, and count
+ metricTypes := make(map[string]int)
+ for _, ts := range series {
+ for _, label := range ts.Labels {
+ if label.Name == "__name__" {
+ metricTypes[label.Value]++
+ }
+ }
+ }
+
+ if metricTypes["epimetheus_test_request_duration_seconds_bucket"] == 0 {
+ t.Error("Expected histogram buckets")
+ }
+ if metricTypes["epimetheus_test_request_duration_seconds_sum"] != 1 {
+ t.Error("Expected histogram sum")
+ }
+ if metricTypes["epimetheus_test_request_duration_seconds_count"] != 1 {
+ t.Error("Expected histogram count")
+ }
+}
+
+func TestGenerateLabeledCounterSeries(t *testing.T) {
+ baseLabels := []prompb.Label{
+ {Name: "instance", Value: "test-instance"},
+ }
+ timestamp := int64(1234567890000)
+
+ series := generateLabeledCounterSeries(baseLabels, timestamp)
+
+ if len(series) == 0 {
+ t.Error("Expected labeled counter series to be generated")
+ }
+
+ // Should have combinations of job types and statuses
+ // 3 job types * 2 statuses = 6 series
+ if len(series) != 6 {
+ t.Errorf("Expected 6 labeled counter series, got %d", len(series))
+ }
+
+ // Verify label structure
+ for _, ts := range series {
+ hasJobType := false
+ hasStatus := false
+ for _, label := range ts.Labels {
+ if label.Name == "job_type" {
+ hasJobType = true
+ }
+ if label.Name == "status" {
+ hasStatus = true
+ }
+ }
+ if !hasJobType {
+ t.Error("Expected job_type label")
+ }
+ if !hasStatus {
+ t.Error("Expected status label")
+ }
+ }
+}
diff --git a/internal/metrics/generator.go b/internal/metrics/generator.go
new file mode 100644
index 0000000..c85906a
--- /dev/null
+++ b/internal/metrics/generator.go
@@ -0,0 +1,85 @@
+package metrics
+
+import (
+ "math/rand"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+const (
+ minTemperature = 15.0
+ maxTemperature = 35.0
+ maxConnections = 100
+ maxRequests = 10
+)
+
+var (
+ jobTypes = []string{"email", "report", "backup"}
+ statuses = []string{"success", "failed"}
+)
+
+// Collectors holds Prometheus metric collectors for realtime mode
+type Collectors struct {
+ RequestsTotal prometheus.Counter
+ ActiveConnections prometheus.Gauge
+ TemperatureCelsius prometheus.Gauge
+ RequestDuration prometheus.Histogram
+ JobsProcessed *prometheus.CounterVec
+}
+
+// NewCollectors creates new Prometheus metric collectors for testing.
+// All metrics are prefixed with "epimetheus_test_" to clearly indicate
+// they are generated by the prometheus-pusher test/demo functionality.
+func NewCollectors() Collectors {
+ return Collectors{
+ RequestsTotal: prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "epimetheus_test_requests_total",
+ Help: "Total number of requests processed (test metric)",
+ },
+ ),
+ ActiveConnections: prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "epimetheus_test_active_connections",
+ Help: "Number of currently active connections (test metric)",
+ },
+ ),
+ TemperatureCelsius: prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "epimetheus_test_temperature_celsius",
+ Help: "Current temperature in Celsius (test metric)",
+ },
+ ),
+ RequestDuration: prometheus.NewHistogram(
+ prometheus.HistogramOpts{
+ Name: "epimetheus_test_request_duration_seconds",
+ Help: "Histogram of request duration in seconds (test metric)",
+ Buckets: prometheus.DefBuckets,
+ },
+ ),
+ JobsProcessed: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "epimetheus_test_jobs_processed_total",
+ Help: "Total number of jobs processed by type (test metric)",
+ },
+ []string{"job_type", "status"},
+ ),
+ }
+}
+
+// Simulate generates random metric values for the collectors
+func (c Collectors) Simulate() {
+ c.RequestsTotal.Add(float64(rand.Intn(maxRequests) + 1))
+ c.ActiveConnections.Set(float64(rand.Intn(maxConnections)))
+ c.TemperatureCelsius.Set(minTemperature + rand.Float64()*(maxTemperature-minTemperature))
+
+ for i := 0; i < rand.Intn(5)+1; i++ {
+ duration := rand.Float64() * 2
+ c.RequestDuration.Observe(duration)
+ }
+
+ for _, jobType := range jobTypes {
+ status := statuses[rand.Intn(len(statuses))]
+ c.JobsProcessed.WithLabelValues(jobType, status).Add(float64(rand.Intn(5)))
+ }
+}
diff --git a/internal/metrics/generator_test.go b/internal/metrics/generator_test.go
new file mode 100644
index 0000000..69395eb
--- /dev/null
+++ b/internal/metrics/generator_test.go
@@ -0,0 +1,53 @@
+package metrics
+
+import (
+ "testing"
+)
+
+func TestNewCollectors(t *testing.T) {
+ collectors := NewCollectors()
+
+ if collectors.RequestsTotal == nil {
+ t.Error("RequestsTotal should not be nil")
+ }
+ if collectors.ActiveConnections == nil {
+ t.Error("ActiveConnections should not be nil")
+ }
+ if collectors.TemperatureCelsius == nil {
+ t.Error("TemperatureCelsius should not be nil")
+ }
+ if collectors.RequestDuration == nil {
+ t.Error("RequestDuration should not be nil")
+ }
+ if collectors.JobsProcessed == nil {
+ t.Error("JobsProcessed should not be nil")
+ }
+}
+
+func TestCollectors_Simulate(t *testing.T) {
+ collectors := NewCollectors()
+
+ // Should not panic
+ collectors.Simulate()
+
+ // Run multiple times to test randomness
+ for i := 0; i < 10; i++ {
+ collectors.Simulate()
+ }
+}
+
+func TestCollectors_SimulateMetrics(t *testing.T) {
+ collectors := NewCollectors()
+
+ // Test that metrics get values after simulation
+ collectors.Simulate()
+
+ // We can't easily inspect the values without the prometheus client,
+ // but we can verify the collectors were created properly
+ if collectors.RequestsTotal == nil {
+ t.Error("RequestsTotal not initialized")
+ }
+ if collectors.JobsProcessed == nil {
+ t.Error("JobsProcessed not initialized")
+ }
+}
diff --git a/internal/metrics/sample.go b/internal/metrics/sample.go
new file mode 100644
index 0000000..04360f5
--- /dev/null
+++ b/internal/metrics/sample.go
@@ -0,0 +1,34 @@
+package metrics
+
+import "time"
+
+// Sample represents a single metric sample with timestamp
+type Sample struct {
+ MetricName string
+ Labels map[string]string
+ Value float64
+ Timestamp time.Time
+}
+
+// NewSample creates a new Sample
+func NewSample(name string, labels map[string]string, value float64, timestamp time.Time) Sample {
+ if labels == nil {
+ labels = make(map[string]string)
+ }
+ return Sample{
+ MetricName: name,
+ Labels: labels,
+ Value: value,
+ Timestamp: timestamp,
+ }
+}
+
+// Age returns how old the sample is
+func (s Sample) Age() time.Duration {
+ return time.Since(s.Timestamp)
+}
+
+// IsRecent returns true if the sample is recent enough for realtime ingestion
+func (s Sample) IsRecent(threshold time.Duration) bool {
+ return s.Age() < threshold
+}
diff --git a/internal/metrics/sample_test.go b/internal/metrics/sample_test.go
new file mode 100644
index 0000000..2ffd78b
--- /dev/null
+++ b/internal/metrics/sample_test.go
@@ -0,0 +1,160 @@
+package metrics
+
+import (
+ "testing"
+ "time"
+)
+
+func TestNewSample(t *testing.T) {
+ tests := []struct {
+ name string
+ metric string
+ labels map[string]string
+ value float64
+ timestamp time.Time
+ wantNil bool
+ }{
+ {
+ name: "with labels",
+ metric: "test_metric",
+ labels: map[string]string{"env": "prod", "host": "server1"},
+ value: 42.5,
+ timestamp: time.Now(),
+ wantNil: false,
+ },
+ {
+ name: "nil labels initialized",
+ metric: "test_metric",
+ labels: nil,
+ value: 100,
+ timestamp: time.Now(),
+ wantNil: false,
+ },
+ {
+ name: "empty labels",
+ metric: "test_metric",
+ labels: map[string]string{},
+ value: 0,
+ timestamp: time.Now(),
+ wantNil: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ sample := NewSample(tt.metric, tt.labels, tt.value, tt.timestamp)
+
+ if sample.MetricName != tt.metric {
+ t.Errorf("MetricName = %v, want %v", sample.MetricName, tt.metric)
+ }
+ if sample.Value != tt.value {
+ t.Errorf("Value = %v, want %v", sample.Value, tt.value)
+ }
+ if sample.Labels == nil {
+ t.Error("Labels should never be nil")
+ }
+ if !sample.Timestamp.Equal(tt.timestamp) {
+ t.Errorf("Timestamp = %v, want %v", sample.Timestamp, tt.timestamp)
+ }
+ })
+ }
+}
+
+func TestSample_Age(t *testing.T) {
+ tests := []struct {
+ name string
+ sample Sample
+ wantNear time.Duration
+ }{
+ {
+ name: "recent sample",
+ sample: Sample{
+ MetricName: "test",
+ Timestamp: time.Now().Add(-5 * time.Minute),
+ },
+ wantNear: 5 * time.Minute,
+ },
+ {
+ name: "old sample",
+ sample: Sample{
+ MetricName: "test",
+ Timestamp: time.Now().Add(-1 * time.Hour),
+ },
+ wantNear: 1 * time.Hour,
+ },
+ {
+ name: "very recent",
+ sample: Sample{
+ MetricName: "test",
+ Timestamp: time.Now().Add(-10 * time.Second),
+ },
+ wantNear: 10 * time.Second,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ age := tt.sample.Age()
+ // Allow 1 second tolerance for test execution time
+ if age < tt.wantNear-time.Second || age > tt.wantNear+time.Second {
+ t.Errorf("Age() = %v, want near %v", age, tt.wantNear)
+ }
+ })
+ }
+}
+
+func TestSample_IsRecent(t *testing.T) {
+ threshold := 5 * time.Minute
+
+ tests := []struct {
+ name string
+ sample Sample
+ threshold time.Duration
+ want bool
+ }{
+ {
+ name: "within threshold",
+ sample: Sample{
+ MetricName: "test",
+ Timestamp: time.Now().Add(-2 * time.Minute),
+ },
+ threshold: threshold,
+ want: true,
+ },
+ {
+ name: "beyond threshold",
+ sample: Sample{
+ MetricName: "test",
+ Timestamp: time.Now().Add(-10 * time.Minute),
+ },
+ threshold: threshold,
+ want: false,
+ },
+ {
+ name: "exactly at threshold",
+ sample: Sample{
+ MetricName: "test",
+ Timestamp: time.Now().Add(-5 * time.Minute),
+ },
+ threshold: threshold,
+ want: false,
+ },
+ {
+ name: "very recent",
+ sample: Sample{
+ MetricName: "test",
+ Timestamp: time.Now().Add(-10 * time.Second),
+ },
+ threshold: threshold,
+ want: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := tt.sample.IsRecent(tt.threshold); got != tt.want {
+ t.Errorf("IsRecent() = %v, want %v (age: %v)", got, tt.want, tt.sample.Age())
+ }
+ })
+ }
+}
diff --git a/internal/parser/csv.go b/internal/parser/csv.go
new file mode 100644
index 0000000..64d16e5
--- /dev/null
+++ b/internal/parser/csv.go
@@ -0,0 +1,101 @@
+package parser
+
+import (
+ "context"
+ "encoding/csv"
+ "fmt"
+ "io"
+ "strconv"
+ "strings"
+ "time"
+
+ "epimetheus/internal/metrics"
+)
+
+// CSVParser parses metrics from CSV format
+type CSVParser struct{}
+
+// NewCSVParser creates a new CSV parser
+func NewCSVParser() *CSVParser {
+ return &CSVParser{}
+}
+
+// Parse reads metrics from CSV format
+// Format: metric_name,label1=value1;label2=value2,value,timestamp_ms
+func (p *CSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) {
+ var samples []metrics.Sample
+
+ csvReader := csv.NewReader(reader)
+ csvReader.Comment = '#'
+
+ lineNum := 0
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+
+ record, err := csvReader.Read()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, fmt.Errorf("line %d: %w", lineNum, err)
+ }
+ lineNum++
+
+ if len(record) < 3 {
+ continue // Skip invalid records
+ }
+
+ sample, err := p.parseRecord(record, lineNum)
+ if err != nil {
+ continue // Skip records with errors
+ }
+
+ samples = append(samples, sample)
+ }
+
+ return samples, nil
+}
+
+func (p *CSVParser) parseRecord(record []string, lineNum int) (metrics.Sample, error) {
+ metricName := strings.TrimSpace(record[0])
+ if metricName == "" {
+ return metrics.Sample{}, fmt.Errorf("empty metric name")
+ }
+
+ labels := parseLabels(record[1])
+
+ value, err := strconv.ParseFloat(strings.TrimSpace(record[2]), 64)
+ if err != nil {
+ return metrics.Sample{}, fmt.Errorf("invalid value: %w", err)
+ }
+
+ timestamp := time.Now()
+ if len(record) > 3 && record[3] != "" {
+ timestampMs, err := strconv.ParseInt(strings.TrimSpace(record[3]), 10, 64)
+ if err == nil {
+ timestamp = time.UnixMilli(timestampMs)
+ }
+ }
+
+ return metrics.NewSample(metricName, labels, value, timestamp), nil
+}
+
+func parseLabels(labelStr string) map[string]string {
+ labels := make(map[string]string)
+ if labelStr == "" {
+ return labels
+ }
+
+ labelPairs := strings.Split(labelStr, ";")
+ for _, pair := range labelPairs {
+ parts := strings.SplitN(pair, "=", 2)
+ if len(parts) == 2 {
+ labels[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1])
+ }
+ }
+ return labels
+}
diff --git a/internal/parser/csv_test.go b/internal/parser/csv_test.go
new file mode 100644
index 0000000..ffe9034
--- /dev/null
+++ b/internal/parser/csv_test.go
@@ -0,0 +1,175 @@
+package parser
+
+import (
+ "context"
+ "strings"
+ "testing"
+ "time"
+)
+
+func TestCSVParser_Parse(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ wantCount int
+ wantErr bool
+ }{
+ {
+ name: "valid single line",
+ input: `test_metric,env=prod;host=server1,42.5,1234567890000`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "multiple lines",
+ input: `metric1,label1=value1,100,1234567890000
+metric2,label2=value2,200,1234567891000
+metric3,label3=value3,300,1234567892000`,
+ wantCount: 3,
+ wantErr: false,
+ },
+ {
+ name: "with comments",
+ input: `# This is a comment
+metric1,env=test,50,1234567890000
+# Another comment
+metric2,env=prod,75,1234567891000`,
+ wantCount: 2,
+ wantErr: false,
+ },
+ {
+ name: "no timestamp defaults to now",
+ input: `metric1,env=test,100`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "no labels",
+ input: `metric1,,100,1234567890000`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "empty input",
+ input: "",
+ wantCount: 0,
+ wantErr: false,
+ },
+ {
+ name: "invalid line causes error",
+ input: `metric1,env=test,100,1234567890000
+invalid
+metric2,env=prod,200,1234567891000`,
+ wantCount: 0,
+ wantErr: true,
+ },
+ {
+ name: "invalid value skipped",
+ input: `metric1,env=test,not_a_number,1234567890000
+metric2,env=prod,200,1234567891000`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ parser := NewCSVParser()
+ reader := strings.NewReader(tt.input)
+ ctx := context.Background()
+
+ samples, err := parser.Parse(ctx, reader)
+
+ if (err != nil) != tt.wantErr {
+ t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if len(samples) != tt.wantCount {
+ t.Errorf("Parse() returned %d samples, want %d", len(samples), tt.wantCount)
+ }
+ })
+ }
+}
+
+func TestCSVParser_ParseLabels(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ want map[string]string
+ }{
+ {
+ name: "single label",
+ input: "env=prod",
+ want: map[string]string{"env": "prod"},
+ },
+ {
+ name: "multiple labels",
+ input: "env=prod;host=server1;region=us-west",
+ want: map[string]string{"env": "prod", "host": "server1", "region": "us-west"},
+ },
+ {
+ name: "empty string",
+ input: "",
+ want: map[string]string{},
+ },
+ {
+ name: "invalid label format skipped",
+ input: "env=prod;invalid;host=server1",
+ want: map[string]string{"env": "prod", "host": "server1"},
+ },
+ {
+ name: "with spaces",
+ input: " env = prod ; host = server1 ",
+ want: map[string]string{"env": "prod", "host": "server1"},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := parseLabels(tt.input)
+ if len(got) != len(tt.want) {
+ t.Errorf("parseLabels() returned %d labels, want %d", len(got), len(tt.want))
+ }
+ for k, v := range tt.want {
+ if got[k] != v {
+ t.Errorf("parseLabels()[%s] = %v, want %v", k, got[k], v)
+ }
+ }
+ })
+ }
+}
+
+func TestCSVParser_ParseWithContext(t *testing.T) {
+ t.Run("context cancellation", func(t *testing.T) {
+ parser := NewCSVParser()
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel() // Cancel immediately
+
+ input := strings.NewReader(`metric1,env=test,100,1234567890000`)
+ _, err := parser.Parse(ctx, input)
+
+ if err != context.Canceled {
+ t.Errorf("Expected context.Canceled error, got %v", err)
+ }
+ })
+}
+
+func TestCSVParser_ParseTimestamp(t *testing.T) {
+ parser := NewCSVParser()
+ input := `metric1,env=test,100,1234567890000`
+ reader := strings.NewReader(input)
+ ctx := context.Background()
+
+ samples, err := parser.Parse(ctx, reader)
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ expectedTime := time.UnixMilli(1234567890000)
+ if !samples[0].Timestamp.Equal(expectedTime) {
+ t.Errorf("Timestamp = %v, want %v", samples[0].Timestamp, expectedTime)
+ }
+}
diff --git a/internal/parser/json.go b/internal/parser/json.go
new file mode 100644
index 0000000..3b8c2e8
--- /dev/null
+++ b/internal/parser/json.go
@@ -0,0 +1,62 @@
+package parser
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "time"
+
+ "epimetheus/internal/metrics"
+)
+
+// JSONParser parses metrics from JSON format
+type JSONParser struct{}
+
+// NewJSONParser creates a new JSON parser
+func NewJSONParser() *JSONParser {
+ return &JSONParser{}
+}
+
+type jsonSample struct {
+ Metric string `json:"metric"`
+ Labels map[string]string `json:"labels"`
+ Value float64 `json:"value"`
+ TimestampMs int64 `json:"timestamp_ms,omitempty"`
+}
+
+// Parse reads metrics from JSON format
+func (p *JSONParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) {
+ var rawSamples []jsonSample
+
+ decoder := json.NewDecoder(reader)
+ if err := decoder.Decode(&rawSamples); err != nil {
+ return nil, fmt.Errorf("failed to parse JSON: %w", err)
+ }
+
+ samples := make([]metrics.Sample, 0, len(rawSamples))
+ for _, raw := range rawSamples {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+
+ if raw.Metric == "" {
+ continue
+ }
+
+ timestamp := time.Now()
+ if raw.TimestampMs > 0 {
+ timestamp = time.UnixMilli(raw.TimestampMs)
+ }
+
+ if raw.Labels == nil {
+ raw.Labels = make(map[string]string)
+ }
+
+ samples = append(samples, metrics.NewSample(raw.Metric, raw.Labels, raw.Value, timestamp))
+ }
+
+ return samples, nil
+}
diff --git a/internal/parser/json_test.go b/internal/parser/json_test.go
new file mode 100644
index 0000000..d521942
--- /dev/null
+++ b/internal/parser/json_test.go
@@ -0,0 +1,177 @@
+package parser
+
+import (
+ "context"
+ "strings"
+ "testing"
+ "time"
+)
+
+func TestJSONParser_Parse(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ wantCount int
+ wantErr bool
+ }{
+ {
+ name: "valid single sample",
+ input: `[
+ {
+ "metric": "test_metric",
+ "labels": {"env": "prod", "host": "server1"},
+ "value": 42.5,
+ "timestamp_ms": 1234567890000
+ }
+ ]`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "multiple samples",
+ input: `[
+ {"metric": "metric1", "labels": {"env": "prod"}, "value": 100, "timestamp_ms": 1234567890000},
+ {"metric": "metric2", "labels": {"env": "test"}, "value": 200, "timestamp_ms": 1234567891000},
+ {"metric": "metric3", "labels": {"env": "dev"}, "value": 300, "timestamp_ms": 1234567892000}
+ ]`,
+ wantCount: 3,
+ wantErr: false,
+ },
+ {
+ name: "no timestamp defaults to now",
+ input: `[
+ {"metric": "test_metric", "labels": {"env": "prod"}, "value": 100}
+ ]`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "no labels",
+ input: `[
+ {"metric": "test_metric", "value": 100, "timestamp_ms": 1234567890000}
+ ]`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "empty metric skipped",
+ input: `[
+ {"metric": "", "labels": {"env": "prod"}, "value": 100},
+ {"metric": "valid_metric", "labels": {"env": "test"}, "value": 200}
+ ]`,
+ wantCount: 1,
+ wantErr: false,
+ },
+ {
+ name: "empty array",
+ input: `[]`,
+ wantCount: 0,
+ wantErr: false,
+ },
+ {
+ name: "invalid json",
+ input: `{not valid json}`,
+ wantCount: 0,
+ wantErr: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ parser := NewJSONParser()
+ reader := strings.NewReader(tt.input)
+ ctx := context.Background()
+
+ samples, err := parser.Parse(ctx, reader)
+
+ if (err != nil) != tt.wantErr {
+ t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if len(samples) != tt.wantCount {
+ t.Errorf("Parse() returned %d samples, want %d", len(samples), tt.wantCount)
+ }
+ })
+ }
+}
+
+func TestJSONParser_ParseWithContext(t *testing.T) {
+ t.Run("context check during parse", func(t *testing.T) {
+ parser := NewJSONParser()
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // Create valid input with empty metrics that will be filtered
+ input := `[
+ {"metric": "", "value": 1},
+ {"metric": "", "value": 2},
+ {"metric": "", "value": 3}
+ ]`
+
+ cancel() // Cancel before parsing
+
+ reader := strings.NewReader(input)
+ _, err := parser.Parse(ctx, reader)
+
+ // Context cancellation should be detected during sample processing
+ if err != context.Canceled {
+ // Note: JSON decoder may finish before context is checked
+ // This test verifies context support exists, but timing is not guaranteed
+ t.Logf("Got error: %v (context may not be checked until after JSON decode)", err)
+ }
+ })
+}
+
+func TestJSONParser_ParseTimestamp(t *testing.T) {
+ parser := NewJSONParser()
+ input := `[{"metric": "test_metric", "value": 100, "timestamp_ms": 1234567890000}]`
+ reader := strings.NewReader(input)
+ ctx := context.Background()
+
+ samples, err := parser.Parse(ctx, reader)
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ expectedTime := time.UnixMilli(1234567890000)
+ if !samples[0].Timestamp.Equal(expectedTime) {
+ t.Errorf("Timestamp = %v, want %v", samples[0].Timestamp, expectedTime)
+ }
+}
+
+func TestJSONParser_ParseLabels(t *testing.T) {
+ parser := NewJSONParser()
+ input := `[{
+ "metric": "test_metric",
+ "labels": {"env": "prod", "host": "server1", "region": "us-west"},
+ "value": 100
+ }]`
+ reader := strings.NewReader(input)
+ ctx := context.Background()
+
+ samples, err := parser.Parse(ctx, reader)
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ expectedLabels := map[string]string{
+ "env": "prod",
+ "host": "server1",
+ "region": "us-west",
+ }
+
+ if len(samples[0].Labels) != len(expectedLabels) {
+ t.Errorf("Got %d labels, want %d", len(samples[0].Labels), len(expectedLabels))
+ }
+
+ for k, v := range expectedLabels {
+ if samples[0].Labels[k] != v {
+ t.Errorf("Label[%s] = %v, want %v", k, samples[0].Labels[k], v)
+ }
+ }
+}
diff --git a/internal/parser/parser.go b/internal/parser/parser.go
new file mode 100644
index 0000000..860baa3
--- /dev/null
+++ b/internal/parser/parser.go
@@ -0,0 +1,56 @@
+package parser
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "os"
+
+ "epimetheus/internal/metrics"
+)
+
+// Parser defines the interface for metric parsers.
+type Parser interface {
+ Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error)
+}
+
+// ParseFile parses metrics from a file.
+func ParseFile(ctx context.Context, filename, format string) ([]metrics.Sample, error) {
+ file, err := os.Open(filename)
+ if err != nil {
+ return nil, fmt.Errorf("failed to open file: %w", err)
+ }
+ defer file.Close()
+
+ return parseWithFormat(ctx, file, format)
+}
+
+// ParseStdin parses metrics from standard input.
+func ParseStdin(ctx context.Context, format string) ([]metrics.Sample, error) {
+ return parseWithFormat(ctx, os.Stdin, format)
+}
+
+// parseWithFormat parses metrics using the specified format.
+func parseWithFormat(ctx context.Context, reader io.Reader, format string) ([]metrics.Sample, error) {
+ var parser Parser
+
+ switch format {
+ case "csv":
+ parser = NewCSVParser()
+ case "json":
+ parser = NewJSONParser()
+ default:
+ return nil, fmt.Errorf("unsupported format: %s (use csv or json)", format)
+ }
+
+ samples, err := parser.Parse(ctx, reader)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse metrics: %w", err)
+ }
+
+ if len(samples) == 0 {
+ return nil, fmt.Errorf("no valid samples found")
+ }
+
+ return samples, nil
+}
diff --git a/internal/parser/parser_test.go b/internal/parser/parser_test.go
new file mode 100644
index 0000000..05255a5
--- /dev/null
+++ b/internal/parser/parser_test.go
@@ -0,0 +1,99 @@
+package parser
+
+import (
+ "context"
+ "strings"
+ "testing"
+)
+
+func TestParseFile_CSV(t *testing.T) {
+ // We can't easily test file operations without creating temp files
+ // So we'll test the error case
+ ctx := context.Background()
+ _, err := ParseFile(ctx, "/nonexistent/file.csv", "csv")
+
+ if err == nil {
+ t.Error("Expected error for nonexistent file")
+ }
+}
+
+func TestParseWithFormat_CSV(t *testing.T) {
+ ctx := context.Background()
+ input := `test_metric,env=prod,100,1234567890000`
+ reader := strings.NewReader(input)
+
+ samples, err := parseWithFormat(ctx, reader, "csv")
+ if err != nil {
+ t.Fatalf("parseWithFormat(csv) error = %v", err)
+ }
+ if len(samples) != 1 {
+ t.Errorf("Expected 1 sample, got %d", len(samples))
+ }
+}
+
+func TestParseWithFormat_JSON(t *testing.T) {
+ ctx := context.Background()
+ input := `[{"metric": "test_metric", "value": 100, "timestamp_ms": 1234567890000}]`
+ reader := strings.NewReader(input)
+
+ samples, err := parseWithFormat(ctx, reader, "json")
+ if err != nil {
+ t.Fatalf("parseWithFormat(json) error = %v", err)
+ }
+ if len(samples) != 1 {
+ t.Errorf("Expected 1 sample, got %d", len(samples))
+ }
+}
+
+func TestParseWithFormat_UnsupportedFormat(t *testing.T) {
+ ctx := context.Background()
+ reader := strings.NewReader("")
+
+ _, err := parseWithFormat(ctx, reader, "xml")
+ if err == nil {
+ t.Error("Expected error for unsupported format")
+ }
+ if err.Error() != "unsupported format: xml (use csv or json)" {
+ t.Errorf("Unexpected error message: %v", err)
+ }
+}
+
+func TestParseWithFormat_EmptyResult(t *testing.T) {
+ ctx := context.Background()
+ input := `[]` // Empty JSON array
+ reader := strings.NewReader(input)
+
+ _, err := parseWithFormat(ctx, reader, "json")
+ if err == nil {
+ t.Error("Expected error for empty samples")
+ }
+ if err.Error() != "no valid samples found" {
+ t.Errorf("Expected 'no valid samples found' error, got: %v", err)
+ }
+}
+
+func TestParseStdin_Format(t *testing.T) {
+ // We can't easily test stdin without mocking,
+ // but we can verify the error path
+ ctx := context.Background()
+
+ // Test with invalid format
+ _, err := parseWithFormat(ctx, strings.NewReader(""), "invalid_format")
+ if err == nil {
+ t.Error("Expected error for invalid format")
+ }
+}
+
+func TestNewCSVParser(t *testing.T) {
+ parser := NewCSVParser()
+ if parser == nil {
+ t.Error("NewCSVParser() returned nil")
+ }
+}
+
+func TestNewJSONParser(t *testing.T) {
+ parser := NewJSONParser()
+ if parser == nil {
+ t.Error("NewJSONParser() returned nil")
+ }
+}
diff --git a/internal/parser/tabular_csv.go b/internal/parser/tabular_csv.go
new file mode 100644
index 0000000..bfa4fbe
--- /dev/null
+++ b/internal/parser/tabular_csv.go
@@ -0,0 +1,256 @@
+package parser
+
+import (
+ "context"
+ "encoding/csv"
+ "fmt"
+ "io"
+ "log"
+ "regexp"
+ "strconv"
+ "strings"
+ "time"
+
+ "epimetheus/internal/metrics"
+ "epimetheus/internal/resolver"
+)
+
+// TabularCSVParser parses tabular CSV files where the first row is headers
+// and all subsequent rows are data. Numeric columns become metrics, string columns become labels.
+type TabularCSVParser struct {
+ metricName string
+ timestamp time.Time
+ resolver *resolver.DNSResolver // DNS resolver for IP addresses
+ resolveLabels map[string]bool // Set of label names to resolve via DNS
+}
+
+// SaveDNSCache saves the DNS cache to disk (called after successful ingestion)
+func (p *TabularCSVParser) SaveDNSCache() {
+ if p.resolver != nil {
+ cacheSize := p.resolver.GetCacheSize()
+ if err := p.resolver.SaveCache(); err != nil {
+ // Log error but don't fail - cache saving is not critical
+ log.Printf("⚠️ Failed to save DNS cache: %v", err)
+ } else if cacheSize > 0 {
+ log.Printf("💾 Saved DNS cache with %d entries", cacheSize)
+ }
+ }
+}
+
+// numericLabelColumns is a list of column names that should be treated as labels
+// even if they contain numeric values. This is useful for categorical codes like
+// HTTP status codes (200, 404, 500) or date periods (20160101, 20160102) that should be labels, not metrics.
+//
+// Column names are matched case-insensitively.
+//
+// Common examples:
+// - responsecode: HTTP response codes (200, 404, 500)
+// - statuscode: Application status codes
+// - status: Status identifiers
+// - code: Generic codes
+// - period: Date periods (20160101, 20160102, YYYYMMDD format)
+var numericLabelColumns = map[string]bool{
+ "responsecode": true,
+ "statuscode": true,
+ "status_code": true,
+ "response_code": true,
+ "status": true,
+ "code": true,
+ "period": true,
+}
+
+// NewTabularCSVParser creates a new tabular CSV parser
+// metricName is the name of the metric to create
+// timestamp is the timestamp to use for all samples
+// resolveIPLabels is a list of label names containing IP addresses to resolve via DNS
+func NewTabularCSVParser(metricName string, timestamp time.Time, resolveIPLabels []string) *TabularCSVParser {
+ // Build set for O(1) lookup
+ resolveMap := make(map[string]bool)
+ for _, label := range resolveIPLabels {
+ resolveMap[label] = true
+ }
+
+ return &TabularCSVParser{
+ metricName: metricName,
+ timestamp: timestamp,
+ resolver: resolver.NewDNSResolver(),
+ resolveLabels: resolveMap,
+ }
+}
+
+// Parse reads metrics from a tabular CSV format
+// First row: column headers
+// Subsequent rows: data values
+// All columns become labels (numeric values are converted to strings)
+func (p *TabularCSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) {
+ var samples []metrics.Sample
+
+ csvReader := csv.NewReader(reader)
+ csvReader.Comment = '#'
+ csvReader.TrimLeadingSpace = true
+ csvReader.FieldsPerRecord = -1 // Allow variable number of fields
+
+ // Read header row
+ headers, err := csvReader.Read()
+ if err != nil {
+ return nil, fmt.Errorf("failed to read header row: %w", err)
+ }
+
+ if len(headers) == 0 {
+ return nil, fmt.Errorf("empty header row")
+ }
+
+ // Trim whitespace and sanitize headers for Prometheus label compatibility
+ for i := range headers {
+ headers[i] = sanitizeLabelName(strings.TrimSpace(headers[i]))
+ }
+
+ // Read data rows
+ rowNum := 1
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+
+ record, err := csvReader.Read()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, fmt.Errorf("line %d: %w", rowNum+1, err)
+ }
+ rowNum++
+
+ if len(record) != len(headers) {
+ // Skip rows with mismatched column counts
+ continue
+ }
+
+ // Add microsecond offset to ensure unique timestamps per row
+ // This prevents duplicate timestamp errors when multiple rows
+ // have the same label combinations
+ rowTimestamp := p.timestamp.Add(time.Duration(rowNum) * time.Microsecond)
+
+ rowSamples := p.parseRecord(headers, record, rowTimestamp)
+ samples = append(samples, rowSamples...)
+ }
+
+ return samples, nil
+}
+
+// parseRecord converts a single CSV row into multiple Samples
+// Numeric columns become separate metrics, string columns become labels.
+//
+// Exception: Column names in numericLabelColumns are always treated as labels,
+// even if they contain numbers (e.g., responsecode: 200, 404, 500).
+//
+// DNS Resolution: Label values matching IP addresses in resolveLabels will be
+// resolved to hostnames via reverse DNS lookup.
+func (p *TabularCSVParser) parseRecord(headers []string, record []string, timestamp time.Time) []metrics.Sample {
+ var samples []metrics.Sample
+
+ // First pass: identify numeric vs string columns and collect label values
+ labels := make(map[string]string)
+ var numericCols []struct {
+ name string
+ value float64
+ }
+
+ for i, header := range headers {
+ if i >= len(record) {
+ continue
+ }
+
+ value := strings.TrimSpace(record[i])
+
+ // Check if this column is in the exception list (should be label even if numeric)
+ headerLower := strings.ToLower(header)
+ if numericLabelColumns[headerLower] {
+ // This column should always be a label, even if it's numeric
+ labels[header] = value
+ continue
+ }
+
+ // Try to parse as number
+ if floatVal, err := strconv.ParseFloat(value, 64); err == nil {
+ // It's a numeric column - will become a metric
+ numericCols = append(numericCols, struct {
+ name string
+ value float64
+ }{name: header, value: floatVal})
+ } else {
+ // It's a string column - becomes a label
+ labels[header] = value
+ }
+ }
+
+ // Resolve IP addresses to hostnames
+ for labelName := range p.resolveLabels {
+ if ipValue, exists := labels[labelName]; exists {
+ // Attempt DNS resolution
+ if hostname, ok := p.resolver.ResolveIP(ipValue); ok && hostname != "" {
+ // Replace IP with hostname
+ labels[labelName] = hostname
+ }
+ // If resolution failed, ipValue remains unchanged
+ }
+ }
+
+ // Create one sample per numeric column
+ for _, numCol := range numericCols {
+ metricName := p.metricName + "_" + numCol.name
+ // Copy labels for each metric
+ metricLabels := make(map[string]string, len(labels))
+ for k, v := range labels {
+ metricLabels[k] = v
+ }
+ samples = append(samples, metrics.NewSample(metricName, metricLabels, numCol.value, timestamp))
+ }
+
+ return samples
+}
+
+// sanitizeLabelName converts a string to a valid Prometheus label name
+// Prometheus labels must match [a-zA-Z_][a-zA-Z0-9_]*
+// Examples:
+// "avg(totaltime)" -> "avg_totaltime"
+// "sum(rcv)" -> "sum_rcv"
+// "response-code" -> "response_code"
+// "123invalid" -> "label_123invalid"
+func sanitizeLabelName(name string) string {
+ if name == "" {
+ return "unknown"
+ }
+
+ // Replace invalid characters with underscore
+ re := regexp.MustCompile(`[^a-zA-Z0-9_]`)
+ sanitized := re.ReplaceAllString(name, "_")
+
+ // Ensure it starts with a letter or underscore
+ if len(sanitized) > 0 && sanitized[0] >= '0' && sanitized[0] <= '9' {
+ sanitized = "label_" + sanitized
+ }
+
+ // Remove consecutive underscores
+ sanitized = regexp.MustCompile(`_+`).ReplaceAllString(sanitized, "_")
+
+ // Remove leading/trailing underscores
+ sanitized = strings.Trim(sanitized, "_")
+
+ if sanitized == "" {
+ return "unknown"
+ }
+
+ return sanitized
+}
+
+// ParseFloat64 tries to parse a string as float64, returns 0.0 if fails
+func parseFloat64(s string) float64 {
+ f, err := strconv.ParseFloat(strings.TrimSpace(s), 64)
+ if err != nil {
+ return 0.0
+ }
+ return f
+}
diff --git a/internal/parser/tabular_csv_test.go b/internal/parser/tabular_csv_test.go
new file mode 100644
index 0000000..07818e8
--- /dev/null
+++ b/internal/parser/tabular_csv_test.go
@@ -0,0 +1,469 @@
+package parser
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+
+ "epimetheus/internal/metrics"
+)
+
+func TestTabularCSVParser_Parse(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ metricName string
+ expectedCount int
+ wantErr bool
+ }{
+ {
+ name: "simple tabular CSV with numeric and text columns",
+ input: `responsecode,httpmethod,user,totaltime
+200,GET,alice,50.5
+404,POST,bob,100.2
+500,GET,charlie,75.0`,
+ metricName: "test_metric",
+ expectedCount: 6, // 3 rows * 2 numeric columns (responsecode, totaltime)
+ wantErr: false,
+ },
+ {
+ name: "CSV with mixed data types",
+ input: `col1,col2,col3
+1,text,3.14
+2,more,2.71
+3,data,1.41`,
+ metricName: "mixed_metric",
+ expectedCount: 6, // 3 rows * 2 numeric columns (col1, col3)
+ wantErr: false,
+ },
+ {
+ name: "CSV with whitespace",
+ input: ` col1 , col2 , col3
+ 1 , value2 , 3
+ 4 , value5 , 6 `,
+ metricName: "whitespace_metric",
+ expectedCount: 4, // 2 rows * 2 numeric columns (col1, col3)
+ wantErr: false,
+ },
+ {
+ name: "CSV with comments",
+ input: `# This is a comment
+col1,col2,col3
+# Another comment
+1,value2,3`,
+ metricName: "comment_metric",
+ expectedCount: 2, // 1 row * 2 numeric columns (col1, col3)
+ wantErr: false,
+ },
+ {
+ name: "empty CSV",
+ input: "",
+ metricName: "empty_metric",
+ expectedCount: 0,
+ wantErr: true, // No header row
+ },
+ {
+ name: "header only",
+ input: `col1,col2,col3
+`,
+ metricName: "header_only",
+ expectedCount: 0,
+ wantErr: false,
+ },
+ {
+ name: "mismatched columns - skipped",
+ input: `col1,col2,col3
+1,value2,3
+value4,value5
+6,value7,8`,
+ metricName: "mismatched_metric",
+ expectedCount: 4, // 2 matching rows * 2 numeric columns
+ wantErr: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+ parser := NewTabularCSVParser(tt.metricName, timestamp, []string{}) // No DNS resolution
+
+ reader := strings.NewReader(tt.input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if (err != nil) != tt.wantErr {
+ t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+
+ if err == nil && len(samples) != tt.expectedCount {
+ t.Errorf("Parse() got %d samples, want %d", len(samples), tt.expectedCount)
+ }
+
+ // Verify all samples have the correct base metric name and timestamp
+ for _, sample := range samples {
+ if !strings.HasPrefix(sample.MetricName, tt.metricName+"_") {
+ t.Errorf("Sample metric name = %s, want prefix %s_", sample.MetricName, tt.metricName)
+ }
+ if !sample.Timestamp.Equal(timestamp) {
+ t.Errorf("Sample timestamp = %v, want %v", sample.Timestamp, timestamp)
+ }
+ }
+ })
+ }
+}
+
+func TestTabularCSVParser_Labels(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+ input := `responsecode,httpmethod,user,totaltime
+200,GET,alice,50.5
+404,POST,bob,100.2`
+
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ // 2 rows * 2 numeric columns (responsecode, totaltime) = 4 samples
+ if len(samples) != 4 {
+ t.Fatalf("Expected 4 samples, got %d", len(samples))
+ }
+
+ // Find the responsecode and totaltime metrics for first row
+ var responsecodeMetric, totaltimeMetric *metrics.Sample
+ for i := range samples {
+ if samples[i].Labels["httpmethod"] == "GET" && samples[i].Labels["user"] == "alice" {
+ if strings.HasSuffix(samples[i].MetricName, "_responsecode") {
+ responsecodeMetric = &samples[i]
+ } else if strings.HasSuffix(samples[i].MetricName, "_totaltime") {
+ totaltimeMetric = &samples[i]
+ }
+ }
+ }
+
+ if responsecodeMetric == nil || totaltimeMetric == nil {
+ t.Fatalf("Could not find expected metrics")
+ }
+
+ // Check responsecode metric
+ if responsecodeMetric.Value != 200 {
+ t.Errorf("responsecode value = %f, want 200", responsecodeMetric.Value)
+ }
+ expectedLabels := map[string]string{
+ "httpmethod": "GET",
+ "user": "alice",
+ }
+ for key, expectedValue := range expectedLabels {
+ if actualValue, ok := responsecodeMetric.Labels[key]; !ok {
+ t.Errorf("responsecode metric missing label %s", key)
+ } else if actualValue != expectedValue {
+ t.Errorf("responsecode metric label %s = %s, want %s", key, actualValue, expectedValue)
+ }
+ }
+
+ // Check totaltime metric
+ if totaltimeMetric.Value != 50.5 {
+ t.Errorf("totaltime value = %f, want 50.5", totaltimeMetric.Value)
+ }
+ for key, expectedValue := range expectedLabels {
+ if actualValue, ok := totaltimeMetric.Labels[key]; !ok {
+ t.Errorf("totaltime metric missing label %s", key)
+ } else if actualValue != expectedValue {
+ t.Errorf("totaltime metric label %s = %s, want %s", key, actualValue, expectedValue)
+ }
+ }
+}
+
+func TestTabularCSVParser_ContextCancellation(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel() // Cancel immediately
+
+ timestamp := time.Now()
+ input := `col1,col2,col3
+value1,value2,value3
+value4,value5,value6`
+
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{})
+ reader := strings.NewReader(input)
+ _, err := parser.Parse(ctx, reader)
+
+ if err != context.Canceled {
+ t.Errorf("Expected context.Canceled error, got %v", err)
+ }
+}
+
+func TestTabularCSVParser_LargeFile(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+
+ // Generate a CSV with 1000 rows, 1 numeric column
+ var builder strings.Builder
+ builder.WriteString("col1,col2,col3\n")
+ for i := 0; i < 1000; i++ {
+ builder.WriteString(fmt.Sprintf("%d,value2,value3\n", i))
+ }
+
+ parser := NewTabularCSVParser("large_metric", timestamp, []string{})
+ reader := strings.NewReader(builder.String())
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ // 1000 rows * 1 numeric column = 1000 samples
+ if len(samples) != 1000 {
+ t.Errorf("Expected 1000 samples, got %d", len(samples))
+ }
+}
+
+func TestSanitizeLabelName(t *testing.T) {
+ tests := []struct {
+ input string
+ expected string
+ }{
+ {"avg(totaltime)", "avg_totaltime"},
+ {"sum(rcv)", "sum_rcv"},
+ {"response-code", "response_code"},
+ {"valid_label", "valid_label"},
+ {"ValidLabel123", "ValidLabel123"},
+ {"123invalid", "label_123invalid"},
+ {"label__with___underscores", "label_with_underscores"},
+ {"_leading", "leading"},
+ {"trailing_", "trailing"},
+ {"special!@#$chars", "special_chars"},
+ {"", "unknown"},
+ {"___", "unknown"},
+ {"http.method", "http_method"},
+ {"status_code", "status_code"},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.input, func(t *testing.T) {
+ result := sanitizeLabelName(tt.input)
+ if result != tt.expected {
+ t.Errorf("sanitizeLabelName(%q) = %q, want %q", tt.input, result, tt.expected)
+ }
+ })
+ }
+}
+
+func TestTabularCSVParser_WithSpecialCharacterHeaders(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+ input := `avg(totaltime),sum(rcv),response-code,http.method
+50.5,1102,200,GET
+100.2,2204,404,POST`
+
+ parser := NewTabularCSVParser("special_metric", timestamp, []string{})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ // 2 rows * 3 numeric columns (avg(totaltime), sum(rcv), response-code) = 6 samples
+ if len(samples) != 6 {
+ t.Fatalf("Expected 6 samples, got %d", len(samples))
+ }
+
+ // Check that metric names are sanitized
+ expectedMetrics := []string{"special_metric_avg_totaltime", "special_metric_sum_rcv", "special_metric_response_code"}
+ foundMetrics := make(map[string]bool)
+ for _, sample := range samples {
+ foundMetrics[sample.MetricName] = true
+ }
+ for _, expected := range expectedMetrics {
+ if !foundMetrics[expected] {
+ t.Errorf("Missing expected metric %s", expected)
+ }
+ }
+
+ // Check that string column becomes label (http.method -> http_method)
+ for _, sample := range samples {
+ if _, ok := sample.Labels["http_method"]; !ok {
+ t.Errorf("Sample missing sanitized label http_method")
+ }
+ }
+
+ // Check values are correct for first row
+ for _, sample := range samples {
+ if sample.Labels["http_method"] == "GET" {
+ if strings.HasSuffix(sample.MetricName, "_avg_totaltime") {
+ if sample.Value != 50.5 {
+ t.Errorf("avg_totaltime value = %f, want 50.5", sample.Value)
+ }
+ } else if strings.HasSuffix(sample.MetricName, "_sum_rcv") {
+ if sample.Value != 1102 {
+ t.Errorf("sum_rcv value = %f, want 1102", sample.Value)
+ }
+ } else if strings.HasSuffix(sample.MetricName, "_response_code") {
+ if sample.Value != 200 {
+ t.Errorf("response_code value = %f, want 200", sample.Value)
+ }
+ }
+ }
+ }
+}
+
+func TestTabularCSVParser_DNSResolution(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+
+ // Test with localhost IP which should resolve on most systems
+ input := `ip,responsecode,count
+127.0.0.1,200,100`
+
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{"ip"})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ // Should have 1 sample (count metric)
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ // The ip label should either be resolved to a hostname or remain as IP if DNS failed
+ ipLabel := samples[0].Labels["ip"]
+ if ipLabel == "" {
+ t.Error("ip label is empty")
+ }
+
+ // If resolution succeeded, it should not be the original IP
+ // If it failed, it should still be the IP
+ t.Logf("IP label value: %s (original: 127.0.0.1)", ipLabel)
+}
+
+func TestTabularCSVParser_DNSResolutionDisabled(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+
+ input := `ip,responsecode,count
+192.168.1.1,200,100`
+
+ // Pass empty slice - no DNS resolution
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ // IP should remain unchanged
+ ipLabel := samples[0].Labels["ip"]
+ if ipLabel != "192.168.1.1" {
+ t.Errorf("IP label = %s, want 192.168.1.1 (DNS resolution should be disabled)", ipLabel)
+ }
+}
+
+func TestTabularCSVParser_DNSResolutionMultipleLabels(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+
+ input := `source_ip,dest_ip,count
+127.0.0.1,192.168.1.1,100`
+
+ // Resolve both source_ip and dest_ip
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{"source_ip", "dest_ip"})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ // Both labels should be present
+ if _, ok := samples[0].Labels["source_ip"]; !ok {
+ t.Error("source_ip label missing")
+ }
+ if _, ok := samples[0].Labels["dest_ip"]; !ok {
+ t.Error("dest_ip label missing")
+ }
+
+ t.Logf("source_ip: %s", samples[0].Labels["source_ip"])
+ t.Logf("dest_ip: %s", samples[0].Labels["dest_ip"])
+}
+
+func TestTabularCSVParser_DNSResolutionNonIPValue(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+
+ // ip column contains non-IP value
+ input := `ip,responsecode,count
+not-an-ip,200,100`
+
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{"ip"})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ if len(samples) != 1 {
+ t.Fatalf("Expected 1 sample, got %d", len(samples))
+ }
+
+ // Non-IP value should remain unchanged
+ ipLabel := samples[0].Labels["ip"]
+ if ipLabel != "not-an-ip" {
+ t.Errorf("IP label = %s, want not-an-ip (should not resolve non-IP values)", ipLabel)
+ }
+}
+
+func TestTabularCSVParser_DNSResolutionCaching(t *testing.T) {
+ ctx := context.Background()
+ timestamp := time.Now()
+
+ // Multiple rows with same IP
+ input := `ip,responsecode,count
+127.0.0.1,200,100
+127.0.0.1,404,50
+127.0.0.1,500,5`
+
+ parser := NewTabularCSVParser("test_metric", timestamp, []string{"ip"})
+ reader := strings.NewReader(input)
+ samples, err := parser.Parse(ctx, reader)
+
+ if err != nil {
+ t.Fatalf("Parse() error = %v", err)
+ }
+
+ // Should have 3 samples
+ if len(samples) != 3 {
+ t.Fatalf("Expected 3 samples, got %d", len(samples))
+ }
+
+ // All should have same resolved IP (cached result)
+ firstIP := samples[0].Labels["ip"]
+ for i, sample := range samples {
+ if sample.Labels["ip"] != firstIP {
+ t.Errorf("Sample %d has different IP label: %s vs %s (caching issue)", i, sample.Labels["ip"], firstIP)
+ }
+ }
+
+ // Check cache size
+ if parser.resolver.GetCacheSize() != 1 {
+ t.Errorf("Expected cache size 1 (one unique IP), got %d", parser.resolver.GetCacheSize())
+ }
+}
diff --git a/internal/resolver/dns_resolver.go b/internal/resolver/dns_resolver.go
new file mode 100644
index 0000000..22ab184
--- /dev/null
+++ b/internal/resolver/dns_resolver.go
@@ -0,0 +1,274 @@
+package resolver
+
+import (
+ "encoding/json"
+ "log"
+ "math/rand"
+ "net"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+)
+
+// DNSResolver provides thread-safe reverse DNS resolution with caching and rate limiting
+type DNSResolver struct {
+ cache map[string]*DNSCacheEntry // IP -> cache entry with metadata
+ mu sync.RWMutex
+ rateLimiter chan struct{} // Channel for rate limiting DNS lookups
+ cacheFile string // Path to persistent cache file
+}
+
+// DNSCacheEntry represents a cached DNS resolution with timestamp
+type DNSCacheEntry struct {
+ IP string `json:"ip"`
+ Hostname string `json:"hostname"`
+ CachedAt time.Time `json:"cached_at"`
+ LastRetry time.Time `json:"last_retry,omitempty"` // Last retry attempt for failed lookups
+ RetryCount int `json:"retry_count,omitempty"` // Number of retry attempts
+}
+
+const (
+ // CacheTTL is the default time-to-live for cache entries (7 days)
+ CacheTTL = 7 * 24 * time.Hour
+ // FailedLookupRetryInterval is how often to retry failed DNS lookups (1 minute)
+ FailedLookupRetryInterval = 1 * time.Minute
+ // MaxRetries is the maximum number of retries for failed lookups
+ MaxRetries = 5
+)
+
+// NewDNSResolver creates a new DNS resolver with an empty cache
+// Rate limited to ~100 DNS lookups per second to avoid overwhelming DNS servers
+// Loads persistent cache from disk if available
+func NewDNSResolver() *DNSResolver {
+ // Determine cache file location
+ cacheDir := os.Getenv("HOME")
+ if cacheDir == "" {
+ cacheDir = "/tmp"
+ }
+ cacheFile := filepath.Join(cacheDir, ".epimetheus-dns-cache.json")
+
+ resolver := &DNSResolver{
+ cache: make(map[string]*DNSCacheEntry),
+ rateLimiter: make(chan struct{}, 10), // Allow 10 concurrent lookups
+ cacheFile: cacheFile,
+ }
+
+ // Load persistent cache from disk
+ resolver.loadCache()
+
+ // Clean expired entries on startup
+ resolver.cleanExpiredEntries()
+
+ // Start rate limiter goroutine that releases tokens every 10ms (100 per second)
+ go func() {
+ ticker := time.NewTicker(10 * time.Millisecond) // 100 requests/second
+ defer ticker.Stop()
+ for range ticker.C {
+ select {
+ case resolver.rateLimiter <- struct{}{}:
+ default:
+ // Channel full, skip this tick
+ }
+ }
+ }()
+
+ return resolver
+}
+
+// ResolveIP attempts to resolve an IP address to a hostname
+// Returns (hostname, true) if successful, ("", false) if resolution failed or not an IP
+// Results are cached to avoid repeated DNS lookups
+// Failed lookups are retried after FailedLookupRetryInterval
+func (r *DNSResolver) ResolveIP(ip string) (string, bool) {
+ // Validate it's an IP address first
+ if !isIPAddress(ip) {
+ return "", false
+ }
+
+ // Check cache first (read lock)
+ r.mu.RLock()
+ entry, cached := r.cache[ip]
+ r.mu.RUnlock()
+
+ if cached {
+ // Check if this is a failed lookup that should be retried
+ if entry.Hostname == "" && entry.RetryCount < MaxRetries {
+ // Check if enough time has passed since last retry
+ if time.Since(entry.LastRetry) >= FailedLookupRetryInterval {
+ // Retry the lookup
+ hostname := r.retryLookup(ip, entry)
+ return hostname, hostname != ""
+ }
+ }
+
+ // Return cached result
+ return entry.Hostname, entry.Hostname != ""
+ }
+
+ // Cache miss - resolve and cache the result
+ hostname := r.resolveAndCache(ip)
+ return hostname, hostname != ""
+}
+
+// resolveAndCache performs DNS lookup and caches the result
+// Returns hostname on success, empty string on failure
+// Rate limited to prevent overwhelming DNS servers
+func (r *DNSResolver) resolveAndCache(ip string) string {
+ // Wait for rate limiter token
+ <-r.rateLimiter
+
+ // Perform reverse DNS lookup
+ names, err := net.LookupAddr(ip)
+
+ var hostname string
+ now := time.Now()
+
+ entry := &DNSCacheEntry{
+ IP: ip,
+ CachedAt: now,
+ }
+
+ if err == nil && len(names) > 0 {
+ // Take first result and remove trailing dot
+ hostname = strings.TrimSuffix(names[0], ".")
+ entry.Hostname = hostname
+ } else {
+ // Failed lookup - log and set up for retry
+ if err != nil {
+ log.Printf("⚠️ DNS resolution failed for %s: %v (will retry)", ip, err)
+ } else {
+ log.Printf("⚠️ DNS resolution failed for %s: no PTR record found (will retry)", ip)
+ }
+ entry.LastRetry = now
+ entry.RetryCount = 0
+ }
+
+ // Cache the result (write lock)
+ r.mu.Lock()
+ r.cache[ip] = entry
+ r.mu.Unlock()
+
+ return hostname
+}
+
+// retryLookup retries a failed DNS lookup
+func (r *DNSResolver) retryLookup(ip string, oldEntry *DNSCacheEntry) string {
+ // Wait for rate limiter token
+ <-r.rateLimiter
+
+ // Perform reverse DNS lookup
+ names, err := net.LookupAddr(ip)
+
+ var hostname string
+ now := time.Now()
+
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ if err == nil && len(names) > 0 {
+ // Successful retry!
+ hostname = strings.TrimSuffix(names[0], ".")
+ log.Printf("✅ DNS retry successful for %s -> %s (after %d attempts)", ip, hostname, oldEntry.RetryCount+1)
+ r.cache[ip] = &DNSCacheEntry{
+ IP: ip,
+ Hostname: hostname,
+ CachedAt: now,
+ }
+ } else {
+ // Still failed - increment retry count
+ newRetryCount := oldEntry.RetryCount + 1
+ if err != nil {
+ log.Printf("⚠️ DNS retry %d/%d failed for %s: %v", newRetryCount, MaxRetries, ip, err)
+ } else {
+ log.Printf("⚠️ DNS retry %d/%d failed for %s: no PTR record found", newRetryCount, MaxRetries, ip)
+ }
+
+ r.cache[ip] = &DNSCacheEntry{
+ IP: ip,
+ Hostname: "",
+ CachedAt: oldEntry.CachedAt,
+ LastRetry: now,
+ RetryCount: newRetryCount,
+ }
+
+ // Log if we've exhausted all retries
+ if newRetryCount >= MaxRetries {
+ log.Printf("❌ Giving up on DNS resolution for %s after %d attempts", ip, MaxRetries)
+ }
+ }
+
+ return hostname
+}
+
+// isIPAddress checks if a string is a valid IP address
+func isIPAddress(s string) bool {
+ return net.ParseIP(s) != nil
+}
+
+// GetCacheSize returns the current cache size (useful for debugging/testing)
+func (r *DNSResolver) GetCacheSize() int {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ return len(r.cache)
+}
+
+// loadCache loads the persistent DNS cache from disk
+func (r *DNSResolver) loadCache() {
+ data, err := os.ReadFile(r.cacheFile)
+ if err != nil {
+ // Cache file doesn't exist or can't be read - start with empty cache
+ return
+ }
+
+ var entries []DNSCacheEntry
+ if err := json.Unmarshal(data, &entries); err != nil {
+ // Invalid cache file - start with empty cache
+ return
+ }
+
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ // Load entries into cache
+ for i := range entries {
+ r.cache[entries[i].IP] = &entries[i]
+ }
+}
+
+// SaveCache saves the current DNS cache to disk
+func (r *DNSResolver) SaveCache() error {
+ r.mu.RLock()
+ entries := make([]DNSCacheEntry, 0, len(r.cache))
+ for _, entry := range r.cache {
+ entries = append(entries, *entry)
+ }
+ r.mu.RUnlock()
+
+ data, err := json.MarshalIndent(entries, "", " ")
+ if err != nil {
+ return err
+ }
+
+ return os.WriteFile(r.cacheFile, data, 0644)
+}
+
+// cleanExpiredEntries removes entries older than CacheTTL
+// Adds randomization to prevent all entries expiring at once
+func (r *DNSResolver) cleanExpiredEntries() {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ now := time.Now()
+ for ip, entry := range r.cache {
+ // Add random jitter (±20%) to TTL to prevent thundering herd
+ jitter := time.Duration(float64(CacheTTL) * 0.2 * (0.5 - rand.Float64() * 2))
+ effectiveTTL := CacheTTL + jitter
+
+ age := now.Sub(entry.CachedAt)
+ if age > effectiveTTL {
+ delete(r.cache, ip)
+ }
+ }
+}
diff --git a/internal/resolver/dns_resolver_test.go b/internal/resolver/dns_resolver_test.go
new file mode 100644
index 0000000..6eab3a7
--- /dev/null
+++ b/internal/resolver/dns_resolver_test.go
@@ -0,0 +1,232 @@
+package resolver
+
+import (
+ "sync"
+ "testing"
+)
+
+func TestNewDNSResolver(t *testing.T) {
+ resolver := NewDNSResolver()
+ if resolver == nil {
+ t.Fatal("NewDNSResolver returned nil")
+ }
+ if resolver.cache == nil {
+ t.Fatal("cache map not initialized")
+ }
+ if len(resolver.cache) != 0 {
+ t.Errorf("expected empty cache, got size %d", len(resolver.cache))
+ }
+}
+
+func TestIsIPAddress(t *testing.T) {
+ tests := []struct {
+ input string
+ expected bool
+ }{
+ {"192.168.1.1", true},
+ {"10.0.0.1", true},
+ {"255.255.255.255", true},
+ {"2001:db8::1", true},
+ {"::1", true},
+ {"not-an-ip", false},
+ {"", false},
+ {"256.1.1.1", false},
+ {"192.168.1", false},
+ {"server.example.com", false},
+ }
+
+ for _, tt := range tests {
+ result := isIPAddress(tt.input)
+ if result != tt.expected {
+ t.Errorf("isIPAddress(%q) = %v, want %v", tt.input, result, tt.expected)
+ }
+ }
+}
+
+func TestResolveIP_InvalidIP(t *testing.T) {
+ resolver := NewDNSResolver()
+
+ invalidInputs := []string{
+ "not-an-ip",
+ "server.example.com",
+ "",
+ "invalid",
+ }
+
+ for _, input := range invalidInputs {
+ hostname, ok := resolver.ResolveIP(input)
+ if ok {
+ t.Errorf("ResolveIP(%q) returned ok=true for invalid IP", input)
+ }
+ if hostname != "" {
+ t.Errorf("ResolveIP(%q) returned hostname=%q for invalid IP", input, hostname)
+ }
+ }
+}
+
+func TestResolveIP_Localhost(t *testing.T) {
+ resolver := NewDNSResolver()
+
+ // Test localhost resolution (should work on most systems)
+ hostname, ok := resolver.ResolveIP("127.0.0.1")
+
+ // We expect either success with "localhost" or failure (depending on system DNS config)
+ // Just verify the function doesn't panic and caches the result
+ if ok && hostname == "" {
+ t.Error("ResolveIP returned ok=true but empty hostname")
+ }
+
+ // Verify caching - second call should hit cache
+ initialCacheSize := resolver.GetCacheSize()
+ if initialCacheSize != 1 {
+ t.Errorf("expected cache size 1 after first lookup, got %d", initialCacheSize)
+ }
+
+ // Second lookup should use cache
+ hostname2, ok2 := resolver.ResolveIP("127.0.0.1")
+ if hostname2 != hostname || ok2 != ok {
+ t.Error("second lookup returned different result (cache not working)")
+ }
+
+ // Cache size should still be 1
+ if resolver.GetCacheSize() != 1 {
+ t.Errorf("cache size changed on second lookup")
+ }
+}
+
+func TestResolveIP_CachingFailedLookup(t *testing.T) {
+ resolver := NewDNSResolver()
+
+ // Use a private IP that likely won't resolve
+ nonExistentIP := "192.168.255.254"
+
+ // First lookup
+ hostname1, ok1 := resolver.ResolveIP(nonExistentIP)
+
+ // Verify it's cached (even if it failed)
+ if resolver.GetCacheSize() != 1 {
+ t.Errorf("expected cache size 1 after first lookup, got %d", resolver.GetCacheSize())
+ }
+
+ // Second lookup should return same result from cache
+ hostname2, ok2 := resolver.ResolveIP(nonExistentIP)
+
+ if hostname1 != hostname2 {
+ t.Errorf("hostname changed between lookups: %q vs %q", hostname1, hostname2)
+ }
+ if ok1 != ok2 {
+ t.Errorf("ok changed between lookups: %v vs %v", ok1, ok2)
+ }
+
+ // Cache size should still be 1
+ if resolver.GetCacheSize() != 1 {
+ t.Errorf("cache size should remain 1, got %d", resolver.GetCacheSize())
+ }
+}
+
+func TestResolveIP_MultipleIPs(t *testing.T) {
+ resolver := NewDNSResolver()
+
+ ips := []string{
+ "127.0.0.1",
+ "192.168.1.1",
+ "10.0.0.1",
+ }
+
+ // Resolve all IPs
+ for _, ip := range ips {
+ resolver.ResolveIP(ip)
+ }
+
+ // Verify all are cached
+ if resolver.GetCacheSize() != len(ips) {
+ t.Errorf("expected cache size %d, got %d", len(ips), resolver.GetCacheSize())
+ }
+
+ // Resolve again to verify cache hits
+ for _, ip := range ips {
+ resolver.ResolveIP(ip)
+ }
+
+ // Cache size should not change
+ if resolver.GetCacheSize() != len(ips) {
+ t.Errorf("cache size changed on second pass")
+ }
+}
+
+func TestResolveIP_Concurrent(t *testing.T) {
+ resolver := NewDNSResolver()
+
+ // Test concurrent access to ensure thread safety
+ var wg sync.WaitGroup
+ numGoroutines := 50
+
+ for i := 0; i < numGoroutines; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+
+ // Each goroutine resolves a few IPs
+ ips := []string{
+ "127.0.0.1",
+ "192.168.1.1",
+ "10.0.0.1",
+ }
+
+ for _, ip := range ips {
+ resolver.ResolveIP(ip)
+ }
+ }(i)
+ }
+
+ wg.Wait()
+
+ // Despite concurrent access, cache should only have unique IPs
+ expectedSize := 3
+ if resolver.GetCacheSize() != expectedSize {
+ t.Errorf("expected cache size %d after concurrent access, got %d",
+ expectedSize, resolver.GetCacheSize())
+ }
+}
+
+func TestResolveIP_IPv6(t *testing.T) {
+ resolver := NewDNSResolver()
+
+ // Test IPv6 localhost
+ hostname, ok := resolver.ResolveIP("::1")
+
+ // Should be cached regardless of success
+ if resolver.GetCacheSize() != 1 {
+ t.Errorf("expected cache size 1, got %d", resolver.GetCacheSize())
+ }
+
+ // If successful, hostname should not be empty
+ if ok && hostname == "" {
+ t.Error("ResolveIP returned ok=true but empty hostname for IPv6")
+ }
+}
+
+func TestGetCacheSize(t *testing.T) {
+ resolver := NewDNSResolver()
+
+ if resolver.GetCacheSize() != 0 {
+ t.Errorf("expected initial cache size 0, got %d", resolver.GetCacheSize())
+ }
+
+ // Add entries
+ resolver.ResolveIP("127.0.0.1")
+ if resolver.GetCacheSize() != 1 {
+ t.Errorf("expected cache size 1 after first entry, got %d", resolver.GetCacheSize())
+ }
+
+ resolver.ResolveIP("192.168.1.1")
+ if resolver.GetCacheSize() != 2 {
+ t.Errorf("expected cache size 2 after second entry, got %d", resolver.GetCacheSize())
+ }
+
+ // Resolving same IP shouldn't increase size
+ resolver.ResolveIP("127.0.0.1")
+ if resolver.GetCacheSize() != 2 {
+ t.Errorf("expected cache size to remain 2, got %d", resolver.GetCacheSize())
+ }
+}
diff --git a/internal/version/version.go b/internal/version/version.go
new file mode 100644
index 0000000..cdd04c6
--- /dev/null
+++ b/internal/version/version.go
@@ -0,0 +1,4 @@
+package version
+
+// Version is the current version of prometheus-pusher
+const Version = "0.0.0"
diff --git a/internal/watcher/file_watcher.go b/internal/watcher/file_watcher.go
new file mode 100644
index 0000000..63ccb08
--- /dev/null
+++ b/internal/watcher/file_watcher.go
@@ -0,0 +1,86 @@
+package watcher
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+)
+
+// FileWatcher monitors a file for changes based on modification time.
+type FileWatcher struct {
+ filePath string
+ pollInterval time.Duration
+ lastModTime time.Time
+}
+
+// NewFileWatcher creates a new file watcher.
+func NewFileWatcher(filePath string, pollInterval time.Duration) *FileWatcher {
+ return &FileWatcher{
+ filePath: filePath,
+ pollInterval: pollInterval,
+ }
+}
+
+// Watch starts watching the file and returns a channel that signals changes.
+// If the file doesn't exist yet, it will wait until it appears.
+func (fw *FileWatcher) Watch(ctx context.Context) (<-chan struct{}, error) {
+ changeChan := make(chan struct{}, 1)
+
+ go func() {
+ defer close(changeChan)
+ ticker := time.NewTicker(fw.pollInterval)
+ defer ticker.Stop()
+
+ fileExists := false
+
+ // Wait for file to exist
+ for !fileExists {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ info, err := os.Stat(fw.filePath)
+ if err != nil {
+ // File doesn't exist yet, keep waiting
+ fmt.Printf("Waiting for file to exist: %s\n", fw.filePath)
+ continue
+ }
+
+ // File now exists!
+ fileExists = true
+ fw.lastModTime = info.ModTime()
+ // Send initial change to process file
+ changeChan <- struct{}{}
+ fmt.Printf("File detected, starting watch: %s\n", fw.filePath)
+ }
+ }
+
+ // Now watch for changes
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ info, err := os.Stat(fw.filePath)
+ if err != nil {
+ // File might have been deleted or is temporarily unavailable
+ continue
+ }
+
+ modTime := info.ModTime()
+ if modTime.After(fw.lastModTime) {
+ fw.lastModTime = modTime
+ changeChan <- struct{}{}
+ }
+ }
+ }
+ }()
+
+ return changeChan, nil
+}
+
+// GetLastModTime returns the last known modification time of the file.
+func (fw *FileWatcher) GetLastModTime() time.Time {
+ return fw.lastModTime
+}
diff --git a/logo.png b/logo.png
new file mode 100644
index 0000000..a98b33d
--- /dev/null
+++ b/logo.png
Binary files differ
diff --git a/run.sh b/run.sh
new file mode 100755
index 0000000..38637cf
--- /dev/null
+++ b/run.sh
@@ -0,0 +1,31 @@
+#!/bin/bash
+
+# Simple script to run Epimetheus
+# Automatically sets up port-forwarding and runs the binary
+
+set -e
+
+echo "Starting Epimetheus..."
+echo ""
+echo "Step 1: Setting up port-forward to Pushgateway..."
+kubectl port-forward -n monitoring svc/pushgateway 9091:9091 > /tmp/pushgateway-port-forward.log 2>&1 &
+PF_PID=$!
+
+# Wait for port-forward to be ready
+sleep 2
+
+echo "Step 2: Running epimetheus binary (realtime mode)..."
+echo "Press Ctrl+C to stop"
+echo ""
+
+# Run the binary in realtime mode and capture its exit status
+./epimetheus -mode=realtime -continuous
+EXIT_CODE=$?
+
+# Clean up port-forward
+echo ""
+echo "Cleaning up port-forward..."
+kill $PF_PID 2>/dev/null || true
+
+echo "Done!"
+exit $EXIT_CODE
diff --git a/test-data/watch-clickhouse-test.csv b/test-data/watch-clickhouse-test.csv
new file mode 100644
index 0000000..b4ca2b2
--- /dev/null
+++ b/test-data/watch-clickhouse-test.csv
@@ -0,0 +1,12 @@
+# Test dataset for watch mode with ClickHouse ingestion
+# Tabular CSV: first row = headers, numeric columns = metrics, string columns = labels
+# Use with: ./epimetheus -mode=watch -file=test-data/watch-clickhouse-test.csv -metric-name=watch_test -clickhouse=http://localhost:8123 -prometheus=
+
+instance,env,requests_total,latency_ms,active_connections
+web1,prod,100,45.2,12
+web2,prod,85,38.1,8
+web3,prod,120,52.3,15
+api1,staging,200,120.5,25
+api2,staging,180,95.2,22
+db1,prod,50,5.1,3
+cache1,prod,500,2.3,50
diff --git a/verify-clickhouse.sh b/verify-clickhouse.sh
new file mode 100755
index 0000000..5819f18
--- /dev/null
+++ b/verify-clickhouse.sh
@@ -0,0 +1,52 @@
+#!/bin/bash
+# Verify that epimetheus metrics were successfully ingested into ClickHouse.
+# Usage: ./verify-clickhouse.sh [clickhouse_url] [table_name]
+# Default: http://localhost:8123, epimetheus_metrics
+
+set -e
+
+CLICKHOUSE_URL="${1:-http://localhost:8123}"
+TABLE="${2:-epimetheus_metrics}"
+
+echo "Verifying ClickHouse ingestion..."
+echo " URL: $CLICKHOUSE_URL"
+echo " Table: $TABLE"
+echo ""
+
+# Check connectivity
+if ! curl -sS "${CLICKHOUSE_URL}/ping" > /dev/null 2>&1; then
+ echo "ERROR: Cannot connect to ClickHouse at $CLICKHOUSE_URL"
+ echo " Make sure ClickHouse is running: sudo systemctl start clickhouse-server"
+ exit 1
+fi
+
+echo "✓ ClickHouse is reachable"
+echo ""
+
+# Query 1: Row count
+echo "--- Row count ---"
+COUNT=$(curl -sS "${CLICKHOUSE_URL}/?query=SELECT%20count()%20FROM%20${TABLE}" 2>/dev/null | tail -1)
+if [ -z "$COUNT" ] || [ "$COUNT" = "0" ]; then
+ echo "ERROR: Table $TABLE is empty or does not exist"
+ echo " Run: ./epimetheus -mode=watch -file=test-data/watch-clickhouse-test.csv -metric-name=watch_test -clickhouse=$CLICKHOUSE_URL -prometheus="
+ exit 1
+fi
+echo "Total rows: $COUNT"
+echo ""
+
+# Query 2: Distinct metrics
+echo "--- Metrics in table ---"
+curl -sS "${CLICKHOUSE_URL}/?query=SELECT%20distinct%20metric%20FROM%20${TABLE}%20ORDER%20BY%20metric%20FORMAT%20PrettyCompact" 2>/dev/null
+echo ""
+
+# Query 3: Sample data
+echo "--- Sample rows (last 5) ---"
+curl -sS "${CLICKHOUSE_URL}/?query=SELECT%20metric%2C%20labels%2C%20value%2C%20timestamp%20FROM%20${TABLE}%20ORDER%20BY%20timestamp%20DESC%20LIMIT%205%20FORMAT%20PrettyCompact" 2>/dev/null
+echo ""
+
+# Query 4: Aggregation by metric
+echo "--- Rows per metric ---"
+curl -sS "${CLICKHOUSE_URL}/?query=SELECT%20metric%2C%20count()%20AS%20cnt%20FROM%20${TABLE}%20GROUP%20BY%20metric%20ORDER%20BY%20cnt%20DESC%20FORMAT%20PrettyCompact" 2>/dev/null
+echo ""
+
+echo "✅ ClickHouse verification complete - data is present and queryable"