diff options
41 files changed, 5920 insertions, 0 deletions
diff --git a/AGENT.md b/AGENT.md new file mode 100644 index 0000000..d5c22e7 --- /dev/null +++ b/AGENT.md @@ -0,0 +1,63 @@ +Follow ~/git/conf/snippets/go/go-projects.md + +## Grafana Dashboard Guidelines + +When creating or updating Grafana dashboards: + +### Sorting Requirements +**ALWAYS ensure ALL panels are sorted by value (descending):** + +1. **Time Series Panels:** + - Add to `options.legend`: `"sortBy": "Last", "sortDesc": true` + +2. **Bar Gauge Panels:** + - Use `sort_desc()` in PromQL queries + - Example: `sort_desc(topk(10, sum by (label) (metric)))` + +3. **Pie/Donut Chart Panels:** + - Add to `options.legend`: `"sortBy": "Value", "sortDesc": true` + +4. **Table Panels:** + - Add to `options`: `"sortBy": [{"displayName": "ColumnName", "desc": true}]` + +### Example Panel Configuration + +**Time Series:** +```json +{ + "type": "timeseries", + "options": { + "legend": { + "displayMode": "table", + "placement": "right", + "sortBy": "Last", + "sortDesc": true, + "calcs": ["lastNotNull", "mean", "max"] + } + } +} +``` + +**Bar Gauge:** +```json +{ + "type": "bargauge", + "targets": [{ + "expr": "sort_desc(topk(15, sum by (cust) (metric)))" + }], + "options": { + "orientation": "horizontal", + "displayMode": "gradient" + } +} +``` + +**Table:** +```json +{ + "type": "table", + "options": { + "sortBy": [{"displayName": "Count", "desc": true}] + } +} +``` diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..02d954c --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +Follow AGENT.md diff --git a/Magefile.go b/Magefile.go new file mode 100644 index 0000000..3cce6e0 --- /dev/null +++ b/Magefile.go @@ -0,0 +1,253 @@ +//go:build mage +// +build mage + +package main + +import ( + "fmt" + "os" + + "github.com/magefile/mage/mg" + "github.com/magefile/mage/sh" +) + +const ( + binaryName = "epimetheus" + mainPath = "./cmd/epimetheus" +) + +// Default target to run when none is specified +var Default = Build + +// Build compiles the epimetheus binary +func Build() error { + fmt.Println("Building epimetheus...") + return sh.RunV("go", "build", "-o", binaryName, mainPath) +} + +// Install installs the binary to $GOPATH/bin +func Install() error { + fmt.Println("Installing epimetheus...") + return sh.RunV("go", "install", mainPath) +} + +// Run executes the epimetheus binary in realtime mode +func Run() error { + mg.Deps(Build) + fmt.Println("Running epimetheus in realtime mode...") + return sh.RunV("./"+binaryName, "-mode=realtime", "-continuous") +} + +// RunHistoric runs epimetheus in historic mode +func RunHistoric() error { + mg.Deps(Build) + fmt.Println("Running epimetheus in historic mode (24 hours ago)...") + return sh.RunV("./"+binaryName, "-mode=historic", "-hours-ago=24") +} + +// RunAuto runs epimetheus in auto mode with a file +func RunAuto(file string) error { + mg.Deps(Build) + if file == "" { + return fmt.Errorf("file parameter required: mage RunAuto <file>") + } + fmt.Printf("Running epimetheus in auto mode with file: %s\n", file) + return sh.RunV("./"+binaryName, "-mode=auto", "-file="+file) +} + +// RunWatchClickHouse runs epimetheus in watch mode with ClickHouse ingestion +func RunWatchClickHouse(file string) error { + mg.Deps(Build) + if file == "" { + file = "test-data/watch-clickhouse-test.csv" + } + fmt.Printf("Running epimetheus in watch mode with ClickHouse (file: %s)\n", file) + return sh.RunV("./"+binaryName, "-mode=watch", "-file="+file, "-metric-name=watch_test", + "-clickhouse=http://localhost:8123", "-prometheus=") +} + +// Test runs all tests +func Test() error { + fmt.Println("Running tests...") + return sh.RunV("go", "test", "./...", "-v") +} + +// TestCoverage runs tests with coverage report +func TestCoverage() error { + fmt.Println("Running tests with coverage...") + if err := sh.RunV("go", "test", "./...", "-cover", "-coverprofile=coverage.out"); err != nil { + return err + } + return sh.RunV("go", "tool", "cover", "-html=coverage.out", "-o", "coverage.html") +} + +// TestRace runs tests with race detector +func TestRace() error { + fmt.Println("Running tests with race detector...") + return sh.RunV("go", "test", "./...", "-race", "-v") +} + +// Benchmark runs all benchmarks +func Benchmark() error { + fmt.Println("Running benchmarks...") + return sh.RunV("go", "test", "./...", "-bench=.", "-benchmem") +} + +// Lint runs golangci-lint +func Lint() error { + fmt.Println("Running linter...") + return sh.RunV("golangci-lint", "run", "./...") +} + +// Fmt formats all Go code +func Fmt() error { + fmt.Println("Formatting code...") + return sh.RunV("go", "fmt", "./...") +} + +// Vet runs go vet +func Vet() error { + fmt.Println("Running go vet...") + return sh.RunV("go", "vet", "./...") +} + +// Tidy runs go mod tidy +func Tidy() error { + fmt.Println("Tidying dependencies...") + return sh.RunV("go", "mod", "tidy") +} + +// Clean removes build artifacts +func Clean() error { + fmt.Println("Cleaning build artifacts...") + files := []string{ + binaryName, + "coverage.out", + "coverage.html", + } + for _, f := range files { + if err := sh.Rm(f); err != nil && !os.IsNotExist(err) { + return err + } + } + return nil +} + +// Generate runs go generate +func Generate() error { + fmt.Println("Running go generate...") + return sh.RunV("go", "generate", "./...") +} + +// Version prints the version +func Version() error { + fmt.Println("Printing version...") + mg.Deps(Build) + return sh.RunV("./"+binaryName, "-version") +} + +// All runs format, vet, test, and build +func All() { + mg.Deps(Fmt, Vet, Test, Build) +} + +// CI runs the full CI pipeline (format check, vet, test, build) +func CI() error { + fmt.Println("Running CI pipeline...") + mg.Deps(Tidy, Vet, Test) + return Build() +} + +// Dev starts development mode with port-forwarding +func Dev() error { + mg.Deps(Build) + fmt.Println("Starting development mode...") + fmt.Println("Setting up port-forward to Pushgateway...") + + // Start port-forward in background + portForwardCmd := sh.RunCmd("kubectl", "port-forward", "-n", "monitoring", "svc/pushgateway", "9091:9091") + go func() { + if err := portForwardCmd(); err != nil { + fmt.Printf("Port-forward error: %v\n", err) + } + }() + + fmt.Println("Running epimetheus in realtime mode...") + return sh.RunV("./"+binaryName, "-mode=realtime", "-continuous") +} + +// GenerateTestData creates test data files +func GenerateTestData() error { + fmt.Println("Generating test data...") + return sh.RunV("./generate-test-data.sh") +} + +// Backfill runs backfill for the last 48 hours +func Backfill() error { + mg.Deps(Build) + fmt.Println("Running backfill (last 48 hours)...") + return sh.RunV("./"+binaryName, "-mode=backfill", "-start-hours=48", "-end-hours=0", "-interval=1") +} + +// Benchmark100MB runs the 100MB benchmark +func Benchmark100MB() error { + fmt.Println("Running 100MB benchmark...") + return sh.RunV("./benchmark-100mb.sh") +} + +// Benchmark1GB runs the 1GB benchmark +func Benchmark1GB() error { + fmt.Println("Running 1GB benchmark...") + return sh.RunV("./benchmark-1gb.sh") +} + +// CleanupBenchmarkData removes benchmark data from Prometheus +func CleanupBenchmarkData() error { + fmt.Println("Cleaning up benchmark data...") + return sh.RunV("./cleanup-benchmark-data.sh") +} + +// CleanupBenchmarkMetrics removes benchmark metric files +func CleanupBenchmarkMetrics() error { + fmt.Println("Cleaning up benchmark metric files...") + return sh.RunV("./cleanup-benchmark-metrics.sh") +} + +// DeployDashboard deploys the Grafana dashboard +func DeployDashboard() error { + fmt.Println("Deploying Grafana dashboard...") + return sh.RunV("./deploy-dashboard.sh") +} + +// Help prints available targets +func Help() { + fmt.Println("Available targets:") + fmt.Println(" build - Build the epimetheus binary (default)") + fmt.Println(" install - Install the binary to $GOPATH/bin") + fmt.Println(" run - Build and run in realtime mode") + fmt.Println(" runHistoric - Build and run in historic mode") + fmt.Println(" runAuto <file> - Build and run in auto mode with file") + fmt.Println(" runWatchClickHouse [file] - Build and run watch mode with ClickHouse (default: test-data/watch-clickhouse-test.csv)") + fmt.Println(" test - Run all tests") + fmt.Println(" testCoverage - Run tests with coverage report") + fmt.Println(" testRace - Run tests with race detector") + fmt.Println(" benchmark - Run Go benchmarks") + fmt.Println(" lint - Run golangci-lint") + fmt.Println(" fmt - Format all Go code") + fmt.Println(" vet - Run go vet") + fmt.Println(" tidy - Run go mod tidy") + fmt.Println(" clean - Remove build artifacts") + fmt.Println(" generate - Run go generate") + fmt.Println(" version - Print version") + fmt.Println(" all - Run fmt, vet, test, and build") + fmt.Println(" ci - Run full CI pipeline") + fmt.Println(" dev - Start development mode with port-forwarding") + fmt.Println(" generateTestData - Generate test data files") + fmt.Println(" backfill - Run backfill for last 48 hours") + fmt.Println(" benchmark100MB - Run 100MB benchmark") + fmt.Println(" benchmark1GB - Run 1GB benchmark") + fmt.Println(" cleanupBenchmarkData - Clean up benchmark data from Prometheus") + fmt.Println(" cleanupBenchmarkMetrics - Clean up benchmark metric files") + fmt.Println(" deployDashboard - Deploy Grafana dashboard") + fmt.Println(" help - Print this help message") +} diff --git a/README.md b/README.md new file mode 100644 index 0000000..ba10a76 --- /dev/null +++ b/README.md @@ -0,0 +1,1000 @@ +<div align="center"> + <img src="logo.png" alt="Epimetheus Logo" width="400"/> +</div> + +# Epimetheus + +A versatile Go tool for pushing metrics to Prometheus with support for both realtime and historic data ingestion. + +## Why "Epimetheus"? + +In Greek mythology, [Epimetheus](https://en.wikipedia.org/wiki/Epimetheus_(mythology)) is Prometheus's brother, whose name means "afterthought" or "hindsight" (while Prometheus means "forethought"). This name cleverly captures the tool's purpose: bringing data to Prometheus **after** collection, whether it's historic data from hours, days, or weeks ago, or realtime data pushed on-demand. + +While Epimetheus is sometimes depicted as foolish in myths (he accepted Pandora's box despite warnings), this tool embraces the "afterthought" aspect productively - it's never too late to bring your metrics home to Prometheus! + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ Epimetheus │ +│ (Metrics Ingestion Tool) │ +│ │ +│ Modes: │ +│ • Realtime - Current metrics (< 5 min old) │ +│ • Historic - Historic metrics (≥ 5 min old) │ +│ • Backfill - Range of historic data │ +│ • Auto - Automatic routing based on timestamp age │ +└─────────────────────────────────────────────────────────────────────────┘ + │ │ + │ Realtime Data │ Historic Data + │ (via HTTP POST) │ (via Remote Write API) + │ Uses "now" timestamp │ Preserves timestamps + ▼ ▼ +┌─────────────────────┐ ┌─────────────────────┐ +│ Pushgateway │ │ Prometheus │ +│ (Port 9091) │ │ (Port 9090) │ +│ │ │ │ +│ • Buffers metrics │ │ Remote Write API: │ +│ • Scraped by │──── Scraped ─────▶ │ /api/v1/write │ +│ Prometheus │ every 15-30s │ │ +│ • No timestamp │ │ Feature Required: │ +│ preservation │ │ --enable-feature= │ +│ │ │ remote-write- │ +│ │ │ receiver │ +└─────────────────────┘ └─────────────────────┘ + │ + │ Prometheus Query API + │ /api/v1/query + ▼ + ┌─────────────────────┐ + │ Grafana │ + │ (Port 3000) │ + │ │ + │ • Prometheus as │ + │ datasource │ + │ • Dashboards: │ + │ - Epimetheus │ + │ Test Metrics │ + │ • Auto-refresh │ + └─────────────────────┘ +``` + +### Data Flow + +1. **Realtime Path** (for current data): + - Epimetheus → Pushgateway (HTTP POST) + - Prometheus scrapes Pushgateway periodically + - Timestamp = "now" when Prometheus scrapes + +2. **Historic Path** (for old data): + - Epimetheus → Prometheus Remote Write API (HTTP POST) + - Direct write to Prometheus TSDB + - Timestamp preserved from original data + +3. **Visualization**: + - Grafana queries Prometheus + - Displays metrics in dashboards + - Auto-refresh every 10 seconds + +## Overview + +**epimetheus** is a standalone binary that: +- **Generates** realistic example metrics simulating production applications +- **Pushes** metrics via Pushgateway (realtime) or Remote Write API (historic) +- **Automatically detects** timestamp age and chooses the optimal ingestion method +- **Supports** multiple data formats (CSV, JSON) and all Prometheus metric types +- **Provides** Grafana dashboard for visualizing test metrics + +## Quick Start + +### 1. Deploy Pushgateway (one-time setup) + +The Pushgateway Helm chart is available in the [conf repository](https://codeberg.org/snonux/conf) at `f3s/pushgateway/helm-chart`. + +```bash +# Clone the conf repository if you haven't already +git clone https://codeberg.org/snonux/conf.git +cd conf/f3s/pushgateway/helm-chart + +# Deploy Pushgateway +helm upgrade --install pushgateway . -n monitoring --create-namespace +``` + +Alternatively, deploy Pushgateway using the official chart: + +```bash +helm repo add prometheus-community https://prometheus-community.github.io/helm-charts +helm install pushgateway prometheus-community/prometheus-pushgateway -n monitoring --create-namespace +``` + +### 2. Run in Realtime Mode + +```bash +# Port-forward Pushgateway +kubectl port-forward -n monitoring svc/pushgateway 9091:9091 & + +# Push test metrics continuously +cd /home/paul/git/conf/f3s/epimetheus +./epimetheus -mode=realtime -continuous +``` + +The binary pushes metrics every 15 seconds. Press Ctrl+C to stop. + +### 3. View Metrics + +```bash +# Pushgateway UI +open http://localhost:9091 + +# Prometheus UI +kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 & +open http://localhost:9090 +``` + +## Operating Modes + +### 👁️ Watch Mode +Monitor CSV files for changes and push metrics to Prometheus with file modification timestamps. + +**Works with ANY CSV format** - automatically detects numeric vs string columns and sanitizes names. + +**NEW: Automatic DNS Resolution** - IP addresses are automatically resolved to hostnames for better observability in Grafana. + +```bash +./epimetheus -mode=watch \ + -file=mydata.csv \ + -metric-name=myapp \ + -prometheus=http://localhost:9090/api/v1/write +``` + +**Features:** +- 🔍 **Format-agnostic**: Works with any tabular CSV structure +- 📊 **Automatic detection**: Numeric columns → metrics, String columns → labels +- 🏷️ **Name sanitization**: `min(potatoes)`, `avg(time)`, `p99(latency)` → valid metric names +- 🌐 **DNS Resolution**: IP addresses → hostnames (e.g., `10.50.52.61` → `foo.example.lan`) +- 💾 **Smart Caching**: In-memory cache prevents redundant DNS lookups +- ⏱️ **Timestamp preservation**: Uses file modification time +- 🔄 **Continuous monitoring**: Polls file every 1 second +- 💪 **Error resilient**: Continues watching despite failures +- 🎯 **Remote Write**: Pushes to Prometheus (preserves timestamps) + +**CSV Format:** +Works with any tabular CSV: +- First row: column headers (automatically sanitized) +- Subsequent rows: data values +- Column names can be anything: `min(x)`, `avg(y)`, `p99(latency)`, etc. + +**Example 1** - Web metrics: +```csv +avg(response_time),p99(latency),endpoint,method +45.2,120.5,/api/users,GET +52.1,135.8,/api/orders,POST +``` + +Generates: +```promql +web_avg_response_time{endpoint="/api/users",method="GET"} 45.2 +web_p99_latency{endpoint="/api/users",method="GET"} 120.5 +web_avg_response_time{endpoint="/api/orders",method="POST"} 52.1 +web_p99_latency{endpoint="/api/orders",method="POST"} 135.8 +``` + +**Example 2** - Food metrics: +```csv +min(potatoes),last(coke),avg(price),country,store_type +5.2,10.5,12.99,USA,grocery +3.8,8.2,9.99,Canada,convenience +``` + +Generates: +```promql +food_min_potatoes{country="USA",store_type="grocery"} 5.2 +food_last_coke{country="USA",store_type="grocery"} 10.5 +food_avg_price{country="USA",store_type="grocery"} 12.99 +# ... etc +``` + +Each row generates N samples (N = number of numeric columns). + +See [CSV-FORMAT-FLEXIBILITY.md](CSV-FORMAT-FLEXIBILITY.md) for more examples. + +**Options:** +- `-file` - CSV file to watch (required) +- `-metric-name` - Base metric name (required, e.g., `food`, `network`, `database`) +- `-prometheus` - Prometheus Remote Write URL (default: http://localhost:9090/api/v1/write) +- `-clickhouse` - ClickHouse HTTP URL (e.g. http://localhost:8123) to also ingest metrics +- `-clickhouse-table` - ClickHouse table name (default: epimetheus_metrics) +- `-job` - Job name for metrics (default: example_metrics_pusher) +- `-resolve-ip-labels` - Additional IP labels to resolve via DNS (default: ip is always resolved) + +**ClickHouse Support:** +Watch mode can ingest to ClickHouse in addition to (or instead of) Prometheus: + +```bash +# Ingest to both Prometheus and ClickHouse +./epimetheus -mode=watch -file=data.csv -metric-name=myapp \ + -prometheus=http://localhost:9090/api/v1/write \ + -clickhouse=http://localhost:8123 + +# ClickHouse only (use -prometheus= to disable Prometheus) +./epimetheus -mode=watch -file=test-data/watch-clickhouse-test.csv \ + -metric-name=watch_test -clickhouse=http://localhost:8123 -prometheus= + +# Verify data in ClickHouse +./verify-clickhouse.sh +``` + +**DNS Resolution:** +By default, the `ip` label is automatically resolved to a hostname. To resolve additional IP labels: + +```bash +./epimetheus -mode=watch \ + -file=network.csv \ + -metric-name=network \ + -resolve-ip-labels=source_ip,dest_ip +``` + +This will resolve: `ip` (default) + `source_ip` + `dest_ip` + +**Example:** +- Input: `ip="10.50.52.61"` +- Output: `ip="foo.example.lan"` +- Failed lookups: IP remains unchanged + +**Documentation:** +- [DNS-RESOLUTION-FEATURE.md](DNS-RESOLUTION-FEATURE.md) - Complete DNS resolution guide +- [CSV-FORMAT-FLEXIBILITY.md](CSV-FORMAT-FLEXIBILITY.md) - Works with ANY CSV format +- [DTAIL-METRICS-EXAMPLE.md](DTAIL-METRICS-EXAMPLE.md) - Detailed dtail.csv example + +### 🔄 Realtime Mode (Default) +Push current metrics to Pushgateway with "now" timestamp. + +```bash +./epimetheus -mode=realtime -continuous +``` + +**Options:** +- `-pushgateway` - Pushgateway URL (default: http://localhost:9091) +- `-job` - Job name (default: example_metrics_pusher) +- `-continuous` - Keep pushing every 15 seconds + +### ⏰ Historic Mode +Push a single datapoint from the past using Remote Write API. + +```bash +# Port-forward Prometheus +kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 & + +# Push data from 24 hours ago +./epimetheus -mode=historic -hours-ago=24 +``` + +**Options:** +- `-prometheus` - Prometheus URL (default: http://localhost:9090/api/v1/write) +- `-hours-ago` - Hours in the past (default: 24) + +### 📦 Backfill Mode +Import a range of historic data points. + +```bash +# Backfill last 48 hours with 1-hour intervals +./epimetheus -mode=backfill -start-hours=48 -end-hours=0 -interval=1 + +# Backfill last week with 6-hour intervals +./epimetheus -mode=backfill -start-hours=168 -end-hours=0 -interval=6 +``` + +**Options:** +- `-start-hours` - Start time in hours ago +- `-end-hours` - End time in hours ago (0 = now) +- `-interval` - Interval between points in hours + +### 🤖 Auto Mode (Recommended!) +Automatically detect timestamp age and route to the correct ingestion method. + +```bash +# Generate test data +./generate-test-data.sh + +# Import mixed current and historic data +./epimetheus -mode=auto -file=test-all-ages.csv +``` + +**Detection Logic:** +- Data < 5 minutes old → Pushgateway (realtime) +- Data ≥ 5 minutes old → Remote Write (historic) + +**Options:** +- `-file` - Input file path +- `-format` - Data format: csv or json (default: csv) +- `-pushgateway` - Pushgateway URL +- `-prometheus` - Prometheus Remote Write URL + +## Data Formats + +### CSV Format + +```csv +# Format: metric_name,labels,value,timestamp_ms +# Labels: key1=value1;key2=value2 +epimetheus_test_requests_total,instance=web1;env=prod,100,1767125148000 +epimetheus_test_temperature_celsius,instance=web2,22.5,1767038748000 + +# Timestamp is optional (uses "now" if omitted) +epimetheus_test_active_connections,instance=web3,42, +``` + +### JSON Format + +```json +[ + { + "metric": "epimetheus_test_requests_total", + "labels": {"instance": "web1", "env": "prod"}, + "value": 100, + "timestamp_ms": 1767125148000 + }, + { + "metric": "epimetheus_test_temperature_celsius", + "labels": {"instance": "web2"}, + "value": 22.5, + "timestamp_ms": 1767038748000 + } +] +``` + +## Test Metrics + +All generated metrics use the `epimetheus_test_` prefix to clearly identify them as test data. + +### Counter: `epimetheus_test_requests_total` +- **Type:** Counter (monotonically increasing) +- **Description:** Total number of requests processed +- **Use case:** Counting total events, requests, errors + +### Gauge: `epimetheus_test_active_connections` +- **Type:** Gauge (can increase or decrease) +- **Description:** Current number of active connections (0-100) +- **Use case:** Current state measurements, capacity + +### Gauge: `epimetheus_test_temperature_celsius` +- **Type:** Gauge +- **Description:** Current temperature in Celsius (0-50°C) +- **Use case:** Environmental monitoring + +### Histogram: `epimetheus_test_request_duration_seconds` +- **Type:** Histogram (distribution) +- **Description:** Request duration distribution +- **Buckets:** 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 seconds +- **Use case:** Latency measurements, SLO tracking + +### Labeled Counter: `epimetheus_test_jobs_processed_total` +- **Type:** Counter with labels +- **Description:** Jobs processed by type and status +- **Labels:** + - `job_type`: email, report, backup + - `status`: success, failed +- **Use case:** Categorized counting, multi-dimensional metrics + +## Grafana Dashboard + +A comprehensive dashboard is available showcasing all test metrics. + +### Dashboard Features + +- **8 Panels:** + 1. Request Rate (line graph) + 2. Total Requests (stat panel) + 3. Active Connections (gauge with thresholds) + 4. Temperature (gauge with thresholds) + 5. Request Duration Histogram (p50, p90, p99) + 6. Average Request Duration (stat) + 7. Jobs Processed by Type (bar gauge) + 8. Jobs Status Breakdown (table) + +- **Auto-refresh:** Every 10 seconds +- **Time range:** Last 15 minutes (customizable) +- **Dark theme optimized** + +### Deploy Dashboard + +#### Option 1: Helm/Kubernetes ConfigMap (Recommended) + +```bash +# Deploy via Kubernetes ConfigMap +kubectl apply -f ../prometheus/epimetheus-dashboard.yaml +``` + +The dashboard will be automatically discovered by Grafana. + +#### Option 2: Manual Import + +```bash +# Port-forward Grafana +kubectl port-forward -n monitoring svc/prometheus-grafana 3000:80 + +# Open Grafana +open http://localhost:3000 + +# Go to Dashboards → Import → Upload grafana-dashboard.json +``` + +#### Option 3: Automated Script + +```bash +# Deploy via API +./deploy-dashboard.sh + +# Or with custom credentials +GRAFANA_URL="http://localhost:3000" \ +GRAFANA_USER="admin" \ +GRAFANA_PASSWORD="yourpassword" \ +./deploy-dashboard.sh +``` + +## Example Queries + +### Basic Queries + +```promql +# View total requests +epimetheus_test_requests_total + +# View request rate over last 5 minutes +rate(epimetheus_test_requests_total[5m]) + +# View current active connections +epimetheus_test_active_connections + +# View current temperature +epimetheus_test_temperature_celsius +``` + +### Histogram Queries + +```promql +# 95th percentile request duration +histogram_quantile(0.95, rate(epimetheus_test_request_duration_seconds_bucket[5m])) + +# 50th percentile (median) +histogram_quantile(0.50, rate(epimetheus_test_request_duration_seconds_bucket[5m])) + +# Average request duration +rate(epimetheus_test_request_duration_seconds_sum[5m]) / +rate(epimetheus_test_request_duration_seconds_count[5m]) +``` + +### Labeled Counter Queries + +```promql +# Failed jobs by type +epimetheus_test_jobs_processed_total{status="failed"} + +# Job success rate +rate(epimetheus_test_jobs_processed_total{status="success"}[5m]) / +rate(epimetheus_test_jobs_processed_total[5m]) + +# Total jobs by type +sum by (job_type) (epimetheus_test_jobs_processed_total) +``` + +### Curl Examples + +```bash +# Port-forward Prometheus +kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 & + +# Query total requests +curl -s "http://localhost:9090/api/v1/query?query=epimetheus_test_requests_total" | jq . + +# Query temperature +curl -s "http://localhost:9090/api/v1/query?query=epimetheus_test_temperature_celsius" | jq . + +# Query request rate +curl -s "http://localhost:9090/api/v1/query?query=rate(epimetheus_test_requests_total[5m])" | jq . + +# Query histogram p95 +curl -s "http://localhost:9090/api/v1/query?query=histogram_quantile(0.95,rate(epimetheus_test_request_duration_seconds_bucket[5m]))" | jq . +``` + +## Time Range Limitations + +### ✅ Supported Time Ranges + +| Time Range | Status | Method | +|------------|--------|--------| +| Current (< 5 min) | ✅ Works | Pushgateway | +| 1 hour old | ✅ Works | Remote Write | +| 1 day old | ✅ Works | Remote Write | +| 1 week old | ✅ Works | Remote Write | +| 1 month old | ✅ Works | Remote Write | + +### ⚠️ Potential Issues + +- **Future timestamps:** Rejected (> 5 minutes in future) +- **Very old data (6+ months):** May be rejected depending on Prometheus retention +- **Years old:** Likely rejected - use `promtool tsdb create-blocks-from` instead +- **Out-of-order samples:** Can't insert older data into existing time series (use different labels) + +### Prometheus Configuration + +Check your retention settings: + +```bash +# View retention +kubectl get prometheus -n monitoring prometheus-kube-prometheus-prometheus \ + -o jsonpath='{.spec.retention}' + +# Default is typically 15 days +``` + +For very old data: +- Increase retention in Prometheus config +- Enable out-of-order ingestion (experimental) +- Use `promtool` for direct TSDB block creation + +## Project Structure + +``` +epimetheus/ +├── cmd/ +│ └── epimetheus/ +│ └── main.go # Main entry point +├── internal/ +│ ├── config/ # Configuration +│ ├── metrics/ # Metric generators +│ ├── parser/ # CSV/JSON parsers (includes tabular CSV) +│ ├── ingester/ # Pushgateway & Remote Write ingesters +│ └── watcher/ # File watcher for watch mode +├── epimetheus # Compiled binary +├── grafana-dashboard.json # Grafana dashboard definition +├── deploy-dashboard.sh # Dashboard deployment script +├── generate-test-data.sh # Test data generator +├── run.sh # Helper script +└── README.md # This file +``` + +## Setup Requirements + +### 1. Enable Prometheus Remote Write Receiver ⚠️ **REQUIRED for Historic Data** + +**IMPORTANT**: To use historic mode, backfill mode, or auto mode with old data, you **must** enable the Prometheus Remote Write receiver. Without this feature, Epimetheus can only push realtime data via Pushgateway. + +The Remote Write receiver is configured in the [conf repository](https://codeberg.org/snonux/conf) at `f3s/prometheus/persistence-values.yaml`: + +```yaml +# In prometheus/persistence-values.yaml (from conf repository) +prometheus: + prometheusSpec: + # Enable Remote Write receiver endpoint and Admin API (Prometheus 3.x syntax) + additionalArgs: + - name: web.enable-remote-write-receiver + value: "" + - name: web.enable-admin-api + value: "" + + # Enable out-of-order ingestion for backfilling + # Allows writing data points older than existing data for the same time series + enableFeatures: + - exemplar-storage + - otlp-write-receiver + + # Allow backfilling up to 31 days in the past (provides 1-day buffer for 30-day datasets) + tsdb: + outOfOrderTimeWindow: 744h # 31 days +``` + +**What This Enables:** +- **Remote Write API**: HTTP endpoint at `/api/v1/write` for ingesting metrics with custom timestamps +- **Admin API**: HTTP endpoints at `/api/v1/admin/tsdb/*` for data deletion and management +- **Out-of-Order Ingestion**: Allows writing data points older than existing data for the same time series +- **31-Day Window**: Can backfill data up to 31 days in the past (provides 1-day buffer for 30-day datasets) + +After updating the configuration, upgrade your Prometheus installation: + +```bash +cd conf/f3s/prometheus +just upgrade # Or manually: +# helm upgrade prometheus prometheus-community/kube-prometheus-stack \ +# -n monitoring -f persistence-values.yaml +``` + +Verify the features are enabled: + +```bash +# Check Remote Write receiver flag +kubectl get pod -n monitoring prometheus-prometheus-kube-prometheus-prometheus-0 \ + -o jsonpath='{.spec.containers[0].args}' | grep -o "web.enable-remote-write-receiver" + +# Check out-of-order time window +kubectl get prometheus -n monitoring prometheus-kube-prometheus-prometheus \ + -o jsonpath='{.spec.tsdb.outOfOrderTimeWindow}' +# Should output: 744h + +# Check admin API flag +kubectl get pod -n monitoring prometheus-prometheus-kube-prometheus-prometheus-0 \ + -o jsonpath='{.spec.containers[0].args}' | grep -o "web.enable-admin-api" +``` + +**Performance Considerations:** + +This configuration is designed for ad-hoc troubleshooting and development, **NOT production use**. Enabling these features has trade-offs: + +- **Increased Memory Usage**: Out-of-order ingestion requires additional memory for buffering and sorting time series +- **Higher TSDB Overhead**: Prometheus TSDB needs to handle non-sequential writes, increasing disk I/O +- **Query Performance**: Queries may be slower due to fragmented data blocks +- **Storage Amplification**: Out-of-order samples can trigger additional compactions, increasing storage usage + +**Recommendation for Production:** +- Keep `outOfOrderTimeWindow` as small as possible (or disabled) +- Monitor Prometheus memory and disk usage closely +- Use Remote Write only when necessary +- Consider using dedicated testing/development Prometheus instances + +**Note**: The syntax changed in Prometheus 3.x - use `additionalArgs` with `web.enable-remote-write-receiver` instead of the deprecated `enableFeatures: [remote-write-receiver]`. + +### 2. Update Prometheus Scrape Config + +Ensure Pushgateway is in scrape targets: + +```yaml +# additional-scrape-configs.yaml +- job_name: 'pushgateway' + honor_labels: true + static_configs: + - targets: + - 'pushgateway.monitoring.svc.cluster.local:9091' +``` + +Apply the configuration: + +```bash +kubectl create secret generic additional-scrape-configs \ + --from-file=/home/paul/git/conf/f3s/prometheus/additional-scrape-configs.yaml \ + --dry-run=client -o yaml -n monitoring | kubectl apply -f - +``` + +## Building from Source + +### Using Mage (Recommended) + +This project includes a [Magefile](./MAGEFILE.md) for easy building, testing, and running: + +```bash +# Install Mage (one-time setup) +go install github.com/magefile/mage@latest + +# Build binary +mage build + +# Run tests +mage test + +# Run with coverage report +mage testCoverage + +# Run in realtime mode +mage run + +# See all available targets +mage -l +``` + +See [MAGEFILE.md](./MAGEFILE.md) for complete documentation. + +### Using Go directly + +```bash +# Build binary +go build -o epimetheus cmd/epimetheus/main.go + +# Run tests +go test ./... -v + +# Check test coverage +go test ./... -cover +``` + +## Troubleshooting + +### Binary can't connect to Pushgateway + +```bash +# Check port-forward is running +ps aux | grep "port-forward.*9091" + +# Restart port-forward +kubectl port-forward -n monitoring svc/pushgateway 9091:9091 +``` + +### Metrics not appearing in Prometheus + +```bash +# Check Pushgateway has metrics +curl http://localhost:9091/metrics | grep "prometheus_pusher_test" + +# Check Prometheus scrape targets +# Open http://localhost:9090/targets - look for "pushgateway" job + +# Check Prometheus logs +kubectl logs -n monitoring -l app.kubernetes.io/name=prometheus +``` + +### "Remote write receiver not enabled" error + +```bash +# Verify feature is enabled +kubectl logs -n monitoring prometheus-prometheus-kube-prometheus-prometheus-0 | grep "remote-write-receiver" + +# Should see: msg="Experimental features enabled" features=[remote-write-receiver] +``` + +### "Out of order sample" error + +This occurs when trying to insert data older than existing data for the same time series. + +**Solutions:** +- Use different job labels for historic data (e.g., `job="historic_data"`) +- Enable out-of-order ingestion in Prometheus (experimental) +- Ensure backfill goes from oldest to newest + +### Dashboard not appearing in Grafana + +```bash +# Check ConfigMap exists +kubectl get configmap -n monitoring | grep epimetheus + +# Check labels +kubectl get configmap epimetheus-dashboard -n monitoring -o yaml | grep "grafana_dashboard" + +# Restart Grafana to force reload +kubectl rollout restart deployment/prometheus-grafana -n monitoring +``` + +## Architecture + +``` +┌─────────────────┐ +│ Go Binary │ +│ (prometheus- │──Push realtime──┐ +│ pusher) │ │ +└─────────────────┘ ▼ + │ ┌──────────────────┐ + │ │ Pushgateway │◄──Scrape──┐ + │ │ (Port 9091) │ │ + │ └──────────────────┘ │ + │ │ + └──Push historic──────────────────┐ │ + ▼ │ + ┌─────────────────┐ │ + │ Prometheus │◄────┘ + │ (Port 9090) │ + │ Remote Write API│ + └─────────────────┘ + │ + │ Datasource + ▼ + ┌─────────────────┐ + │ Grafana │ + │ (Port 3000) │ + │ Dashboards │ + └─────────────────┘ +``` + +## Best Practices + +### When to Use Pushgateway vs. Remote Write + +**Use Pushgateway (realtime mode):** +- Short-lived batch jobs +- Service-level metrics +- Jobs behind firewalls +- Current/recent data (< 5 minutes old) + +**Use Remote Write (historic mode):** +- Historic data import +- Backfilling gaps +- Data migration +- Data older than 5 minutes + +**Use Auto Mode:** +- Mixed current and historic data +- Importing from files +- Unknown timestamp ages +- General-purpose ingestion + +### Metric Design + +- **Use appropriate metric types:** + - Counter for cumulative values (requests, errors) + - Gauge for point-in-time values (temperature, connections) + - Histogram for distributions (latency, sizes) + +- **Label cardinality:** + - Include meaningful labels + - Avoid high-cardinality labels (user IDs, timestamps) + - Keep label combinations reasonable (< 1000 per metric) + +- **Naming conventions:** + - Use descriptive names + - Include units in gauge names (\_celsius, \_bytes) + - Use \_total suffix for counters + +## Cleanup + +### Cleaning Up Benchmark Data from Prometheus + +For cleaning up benchmark metrics from Prometheus, use the provided cleanup script: + +```bash +# Port-forward to Prometheus +kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 & + +# Run the cleanup script +./cleanup-benchmark-data.sh +``` + +The script will: +1. Delete all `epimetheus_benchmark_*` metrics using the Prometheus Admin API +2. Clean up tombstones to free disk space +3. Provide clear success/error feedback + +**Manual cleanup** (if you prefer): + +```bash +# Delete specific metric +curl -X POST 'http://localhost:9090/api/v1/admin/tsdb/delete_series?match[]=epimetheus_benchmark_cpu_usage' + +# Clean up tombstones +curl -X POST 'http://localhost:9090/api/v1/admin/tsdb/clean_tombstones' +``` + +### Other Cleanup Tasks + +```bash +# Stop port-forwards +pkill -f "port-forward.*9091" +pkill -f "port-forward.*9090" +pkill -f "port-forward.*3000" + +# Delete test metrics from Pushgateway +curl -X DELETE http://localhost:9091/metrics/job/example_metrics_pusher + +# Uninstall Pushgateway (if needed) +helm uninstall pushgateway -n monitoring +``` + +## MacOS Setup + +### Basic Installation + +```bash +brew install prometheus +brew install grafana +go install github.com/prometheus/pushgateway@latest +brew services start grafana +brew services start prometheus +~/go/bin/pushgateway & +``` + +Once done, login to http://localhost:3000 as admin:admin, you will be prompted to change the password. Afterwards, add http://localhost:9090 as a Prometheus datasource. + +### Enable Remote Write Receiver (Required for Watch Mode) + +⚠️ **Important**: Watch mode, historic mode, backfill mode, and auto mode require the Prometheus Remote Write receiver to be enabled. + +#### Option 1: Permanent Configuration (Recommended) + +Edit the Prometheus arguments file: + +```bash +# Edit the arguments file +nano /opt/homebrew/etc/prometheus.args +``` + +Add this line at the end: +``` +--web.enable-remote-write-receiver +``` + +The complete file should look like: +``` +--config.file /opt/homebrew/etc/prometheus.yml +--web.listen-address=127.0.0.1:9090 +--storage.tsdb.path /opt/homebrew/var/prometheus +--web.enable-remote-write-receiver +--web.enable-admin-api +``` + +**Note:** `--web.enable-admin-api` is optional but recommended for easier data management (allows deleting old metrics). + +Restart Prometheus: +```bash +brew services restart prometheus +``` + +Verify it's working: +```bash +# Check Prometheus is healthy +curl http://localhost:9090/-/healthy + +# Test Remote Write endpoint (should return 400, not 404) +curl -X POST http://localhost:9090/api/v1/write +``` + +#### Option 2: Temporary (For Testing) + +Stop the service and start manually: + +```bash +# Stop brew service +brew services stop prometheus + +# Start with Remote Write enabled +prometheus --web.enable-remote-write-receiver +``` + +Keep this terminal open. In another terminal, run your epimetheus commands. + +**Note**: This only lasts until you stop the terminal. Use Option 1 for permanent setup. + +### Clearing Old Metrics (Optional) + +If you need to delete old metrics and start fresh: + +```bash +# Delete specific metrics (e.g., blockstore) +curl -X POST -g 'http://localhost:9090/api/v1/admin/tsdb/delete_series?match[]={__name__=~"blockstore_.*"}' + +# Clean up deleted data +curl -X POST http://localhost:9090/api/v1/admin/tsdb/clean_tombstones + +# Wait a moment for cleanup +sleep 2 +``` + +**Note:** Admin API must be enabled (add `--web.enable-admin-api` to prometheus.args). + +### Verify Setup + +Once Remote Write is enabled, test watch mode: + +```bash +# Create a test CSV +cat > /tmp/test.csv << EOF +status,count,method +200,100,GET +404,50,POST +EOF + +# Watch the file +./epimetheus -mode=watch \ + -file=/tmp/test.csv \ + -metric-name=test \ + -prometheus=http://localhost:9090/api/v1/write +``` + +You should see: +``` +✅ Successfully pushed X samples to Prometheus +``` + +Query in Prometheus (http://localhost:9090): +```promql +{__name__=~"test_.*"} +``` + +## Additional Resources + +- [Prometheus Documentation](https://prometheus.io/docs/) +- [Pushgateway Documentation](https://github.com/prometheus/pushgateway) +- [Prometheus Remote Write Spec](https://prometheus.io/docs/concepts/remote_write_spec/) +- [Grafana Documentation](https://grafana.com/docs/) + +## Version + +Current version: 0.0.0 + +## License + +See LICENSE file for details. diff --git a/backfill-historic-data.sh b/backfill-historic-data.sh new file mode 100755 index 0000000..fa0e065 --- /dev/null +++ b/backfill-historic-data.sh @@ -0,0 +1,60 @@ +#!/bin/bash +# Backfill historic data to Prometheus for Epimetheus dashboard + +set -e + +echo "=== Epimetheus Historic Data Backfill ===" +echo "" +echo "This script will populate Prometheus with historic test data" +echo "going back 7 days, with data points every 12 hours." +echo "" + +# Port-forward to Prometheus +echo "Step 1: Setting up port-forward to Prometheus..." +kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 > /tmp/epimetheus-prom-pf.log 2>&1 & +PF_PID=$! +echo "Port-forward started (PID: $PF_PID)" + +# Wait for port-forward to be ready +sleep 5 + +# Run backfill +echo "" +echo "Step 2: Backfilling data from 7 days ago to now (12-hour intervals)..." +echo "" +./epimetheus -mode=backfill \ + -prometheus=http://localhost:9090/api/v1/write \ + -start-hours=168 \ + -end-hours=0 \ + -interval=12 + +EXIT_CODE=$? + +# Clean up +echo "" +echo "Step 3: Cleaning up port-forward..." +kill $PF_PID 2>/dev/null || true + +if [ $EXIT_CODE -eq 0 ]; then + echo "" + echo "✅ Historic data backfill complete!" + echo "" + echo "The Grafana dashboard timeline should now show data from:" + echo " - 7 days ago" + echo " - 6 days ago" + echo " - 5 days ago" + echo " - 4 days ago" + echo " - 3 days ago" + echo " - 2 days ago" + echo " - 1 day ago" + echo " - 12 hours ago" + echo " - Now (from previous realtime push)" + echo "" + echo "View the dashboard at: https://grafana.f3s.buetow.org/d/epimetheus-test/epimetheus-test-metrics" +else + echo "" + echo "❌ Backfill failed with exit code $EXIT_CODE" + echo "Check /tmp/epimetheus-prom-pf.log for port-forward logs" +fi + +exit $EXIT_CODE diff --git a/benchmark-100mb.sh b/benchmark-100mb.sh new file mode 100755 index 0000000..1d3fad0 --- /dev/null +++ b/benchmark-100mb.sh @@ -0,0 +1,223 @@ +#!/bin/bash +# Benchmark script: Generate and ingest 100MB of historic metrics +# This tests Epimetheus performance with large-scale data ingestion + +set -e + +# Optimize Go GC for better performance (Phase 3 optimization) +export GOGC=200 # Reduce GC frequency (default 100) +export GOMEMLIMIT=3GiB # Set memory limit for Go 1.19+ + +BENCHMARK_DIR="benchmark-results" +TIMESTAMP=$(date +%Y%m%d-%H%M%S) +RESULT_FILE="$BENCHMARK_DIR/benchmark-$TIMESTAMP.log" + +mkdir -p "$BENCHMARK_DIR" + +echo "=== Epimetheus 100MB Benchmark ===" | tee "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" +echo "Timestamp: $(date)" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Step 1: Generate 100MB of test data +echo "Step 1: Generating 100MB of test data..." | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Calculate: ~70 bytes per line, 100MB = ~1.5M lines +TARGET_SIZE_MB=100 +TARGET_BYTES=$((TARGET_SIZE_MB * 1024 * 1024)) +BYTES_PER_LINE=70 +TARGET_LINES=$((TARGET_BYTES / BYTES_PER_LINE)) + +echo "Target size: ${TARGET_SIZE_MB}MB" | tee -a "$RESULT_FILE" +echo "Estimated lines needed: $TARGET_LINES" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Generate data going back 7 days with 1-minute intervals +# This gives us ~10,080 data points across 7 days +# We'll generate multiple metrics per timestamp to reach 100MB +# All data is historic (> 5 minutes old) to use Remote Write API exclusively + +GENERATION_START=$(date +%s) + +NOW=$(date +%s)000 # Current time in milliseconds +ONE_HOUR_AGO=$((NOW - 3600000)) # Start from 1 hour ago to ensure all data is historic +SEVEN_DAYS_AGO=$((ONE_HOUR_AGO - 604800000)) # 7 days before that + +# CSV header +cat > benchmark-data-100mb.csv << 'EOF' +# Prometheus metrics - 100MB benchmark dataset +# Format: metric_name,labels,value,timestamp_ms +EOF + +# Generate metrics +# We'll create ~150 unique time series, each with ~10,000 data points = 1.5M samples +METRICS=( + "epimetheus_benchmark_cpu_usage" + "epimetheus_benchmark_memory_bytes" + "epimetheus_benchmark_disk_io_bytes" + "epimetheus_benchmark_network_rx_bytes" + "epimetheus_benchmark_network_tx_bytes" + "epimetheus_benchmark_requests_total" + "epimetheus_benchmark_errors_total" + "epimetheus_benchmark_response_time_ms" + "epimetheus_benchmark_active_connections" + "epimetheus_benchmark_queue_depth" +) + +INSTANCES=( + "web-01" "web-02" "web-03" "web-04" "web-05" + "api-01" "api-02" "api-03" "api-04" "api-05" + "db-01" "db-02" "db-03" "worker-01" "worker-02" +) + +INTERVAL_MS=60000 # 1 minute interval +TOTAL_INTERVALS=10080 # 7 days of 1-minute intervals + +echo "Generating data..." | tee -a "$RESULT_FILE" +LINES_GENERATED=0 + +for ((i=0; i<TOTAL_INTERVALS; i++)); do + TIMESTAMP=$((SEVEN_DAYS_AGO + (i * INTERVAL_MS))) + + # Generate a sample for each metric x instance combination + for METRIC in "${METRICS[@]}"; do + for INSTANCE in "${INSTANCES[@]}"; do + VALUE=$((RANDOM % 1000)) + echo "$METRIC,instance=$INSTANCE;env=benchmark,$VALUE,$TIMESTAMP" >> benchmark-data-100mb.csv + LINES_GENERATED=$((LINES_GENERATED + 1)) + done + done + + # Progress indicator every 1000 intervals + if [ $((i % 1000)) -eq 0 ]; then + PROGRESS=$((i * 100 / TOTAL_INTERVALS)) + echo -ne "\rProgress: $PROGRESS% ($LINES_GENERATED lines)" | tee -a "$RESULT_FILE" + fi +done + +echo "" | tee -a "$RESULT_FILE" + +GENERATION_END=$(date +%s) +GENERATION_TIME=$((GENERATION_END - GENERATION_START)) + +# Get actual file size +FILE_SIZE=$(stat -f%z benchmark-data-100mb.csv 2>/dev/null || stat -c%s benchmark-data-100mb.csv 2>/dev/null) +FILE_SIZE_MB=$((FILE_SIZE / 1024 / 1024)) + +echo "" | tee -a "$RESULT_FILE" +echo "Data generation complete:" | tee -a "$RESULT_FILE" +echo " Lines generated: $LINES_GENERATED" | tee -a "$RESULT_FILE" +echo " File size: ${FILE_SIZE_MB}MB ($FILE_SIZE bytes)" | tee -a "$RESULT_FILE" +echo " Generation time: ${GENERATION_TIME}s" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Step 2: Start port-forward to Prometheus +echo "Step 2: Setting up port-forward to Prometheus..." | tee -a "$RESULT_FILE" +kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 > /tmp/benchmark-pf.log 2>&1 & +PF_PID=$! +echo "Port-forward started (PID: $PF_PID)" | tee -a "$RESULT_FILE" +sleep 8 # Wait for port-forward to be ready +echo "" | tee -a "$RESULT_FILE" + +# Step 3: Get baseline Prometheus metrics +echo "Step 3: Collecting baseline Prometheus metrics..." | tee -a "$RESULT_FILE" +PROM_POD=$(kubectl get pod -n monitoring -l app.kubernetes.io/name=prometheus -o jsonpath='{.items[0].metadata.name}') +echo "Prometheus pod: $PROM_POD" | tee -a "$RESULT_FILE" + +# Get memory and CPU usage before ingestion +BASELINE_MEMORY=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $3}') +BASELINE_CPU=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $2}') + +echo " Baseline memory: $BASELINE_MEMORY" | tee -a "$RESULT_FILE" +echo " Baseline CPU: $BASELINE_CPU" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Step 4: Run ingestion benchmark +echo "Step 4: Running ingestion benchmark..." | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +INGEST_START=$(date +%s.%N) + +# Run epimetheus with time measurement +# Use CSV mode with Remote Write API (all data is historic) +# Note: We can't use auto mode because it requires both Pushgateway and Remote Write +# Instead, we'll implement a direct CSV->Remote Write ingestion + +echo "Parsing CSV and preparing for Remote Write ingestion..." | tee -a "$RESULT_FILE" + +# For now, use backfill mode to process the CSV data +# We'll need to enhance epimetheus to support pure CSV->RemoteWrite mode +echo "WARNING: Using auto mode - this may fail if data is too recent" | tee -a "$RESULT_FILE" +echo "Continuing with Remote Write API for historic data..." | tee -a "$RESULT_FILE" + +/usr/bin/time -v ./epimetheus \ + -mode=auto \ + -file=benchmark-data-100mb.csv \ + -format=csv \ + -prometheus=http://localhost:9090/api/v1/write \ + -pushgateway=http://localhost:9091 \ + 2>&1 | tee -a "$RESULT_FILE" || true # Continue even if pushgateway fails + +INGEST_END=$(date +%s.%N) + +# Calculate ingestion time +INGEST_TIME=$(echo "$INGEST_END - $INGEST_START" | bc) + +echo "" | tee -a "$RESULT_FILE" +echo "Ingestion complete:" | tee -a "$RESULT_FILE" +echo " Total time: ${INGEST_TIME}s" | tee -a "$RESULT_FILE" + +# Calculate throughput +SAMPLES_PER_SECOND=$(echo "scale=2; $LINES_GENERATED / $INGEST_TIME" | bc) +MB_PER_SECOND=$(echo "scale=2; $FILE_SIZE_MB / $INGEST_TIME" | bc) + +echo " Samples/second: $SAMPLES_PER_SECOND" | tee -a "$RESULT_FILE" +echo " MB/second: $MB_PER_SECOND" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Step 5: Get post-ingestion Prometheus metrics +echo "Step 5: Collecting post-ingestion Prometheus metrics..." | tee -a "$RESULT_FILE" +sleep 5 # Wait for metrics to stabilize + +POST_MEMORY=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $3}') +POST_CPU=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $2}') + +echo " Post-ingestion memory: $POST_MEMORY" | tee -a "$RESULT_FILE" +echo " Post-ingestion CPU: $POST_CPU" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Step 6: Query some data to verify ingestion +echo "Step 6: Verifying data ingestion..." | tee -a "$RESULT_FILE" +QUERY_RESULT=$(curl -s "http://localhost:9090/api/v1/query?query=count(epimetheus_benchmark_cpu_usage)" | jq -r '.data.result[0].value[1]') +echo " Samples found for epimetheus_benchmark_cpu_usage: $QUERY_RESULT" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Step 7: Cleanup +echo "Step 7: Cleaning up..." | tee -a "$RESULT_FILE" +kill $PF_PID 2>/dev/null || true +echo "" | tee -a "$RESULT_FILE" + +# Summary +echo "=== BENCHMARK SUMMARY ===" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" +echo "Dataset:" | tee -a "$RESULT_FILE" +echo " Size: ${FILE_SIZE_MB}MB" | tee -a "$RESULT_FILE" +echo " Samples: $LINES_GENERATED" | tee -a "$RESULT_FILE" +echo " Time range: 7 days" | tee -a "$RESULT_FILE" +echo " Interval: 1 minute" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" +echo "Performance:" | tee -a "$RESULT_FILE" +echo " Generation time: ${GENERATION_TIME}s" | tee -a "$RESULT_FILE" +echo " Ingestion time: ${INGEST_TIME}s" | tee -a "$RESULT_FILE" +echo " Throughput: $SAMPLES_PER_SECOND samples/s" | tee -a "$RESULT_FILE" +echo " Throughput: $MB_PER_SECOND MB/s" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" +echo "Resources:" | tee -a "$RESULT_FILE" +echo " Memory: $BASELINE_MEMORY -> $POST_MEMORY" | tee -a "$RESULT_FILE" +echo " CPU: $BASELINE_CPU -> $POST_CPU" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" +echo "Results saved to: $RESULT_FILE" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" +echo "To view results: cat $RESULT_FILE" +echo "To analyze: less $RESULT_FILE" diff --git a/benchmark-1gb.sh b/benchmark-1gb.sh new file mode 100755 index 0000000..f715376 --- /dev/null +++ b/benchmark-1gb.sh @@ -0,0 +1,223 @@ +#!/bin/bash +# Benchmark script: Generate and ingest 1GB of historic metrics +# This tests Epimetheus performance with large-scale data ingestion + +set -e + +# Optimize Go GC for better performance (Phase 3 optimization) +export GOGC=200 # Reduce GC frequency (default 100) +export GOMEMLIMIT=3GiB # Set memory limit for Go 1.19+ + +BENCHMARK_DIR="benchmark-results" +TIMESTAMP=$(date +%Y%m%d-%H%M%S) +RESULT_FILE="$BENCHMARK_DIR/benchmark-1gb-$TIMESTAMP.log" + +mkdir -p "$BENCHMARK_DIR" + +echo "=== Epimetheus 1GB Benchmark ===" | tee "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" +echo "Timestamp: $(date)" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Step 1: Generate 1GB of test data +echo "Step 1: Generating 1GB of test data..." | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Calculate: ~80 bytes per line, 1GB = ~13M lines +TARGET_SIZE_MB=1000 +TARGET_BYTES=$((TARGET_SIZE_MB * 1024 * 1024)) +BYTES_PER_LINE=80 +TARGET_LINES=$((TARGET_BYTES / BYTES_PER_LINE)) + +echo "Target size: ${TARGET_SIZE_MB}MB" | tee -a "$RESULT_FILE" +echo "Estimated lines needed: $TARGET_LINES" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Generate data going back 30 days with 30-second intervals +# This gives us ~86,400 data points across 30 days (respects Prometheus 720h out-of-order limit) +# We'll generate multiple metrics per timestamp to reach 1GB +# All data is historic (> 5 minutes old) to use Remote Write API exclusively + +GENERATION_START=$(date +%s) + +NOW=$(date +%s)000 # Current time in milliseconds +ONE_HOUR_AGO=$((NOW - 3600000)) # Start from 1 hour ago to ensure all data is historic +THIRTY_DAYS_AGO=$((ONE_HOUR_AGO - 2592000000)) # 30 days before that (30 * 24 * 60 * 60 * 1000) + +# CSV header +cat > benchmark-data-1gb.csv << 'EOF' +# Prometheus metrics - 1GB benchmark dataset +# Format: metric_name,labels,value,timestamp_ms +EOF + +# Generate metrics +# We'll create ~150 unique time series, each with ~86,400 data points = 13M samples +METRICS=( + "epimetheus_benchmark_cpu_usage" + "epimetheus_benchmark_memory_bytes" + "epimetheus_benchmark_disk_io_bytes" + "epimetheus_benchmark_network_rx_bytes" + "epimetheus_benchmark_network_tx_bytes" + "epimetheus_benchmark_requests_total" + "epimetheus_benchmark_errors_total" + "epimetheus_benchmark_response_time_ms" + "epimetheus_benchmark_active_connections" + "epimetheus_benchmark_queue_depth" +) + +INSTANCES=( + "web-01" "web-02" "web-03" "web-04" "web-05" + "api-01" "api-02" "api-03" "api-04" "api-05" + "db-01" "db-02" "db-03" "worker-01" "worker-02" +) + +INTERVAL_MS=30000 # 30 second interval (to maintain 1GB size with 30 days) +TOTAL_INTERVALS=86400 # 30 days of 30-second intervals + +echo "Generating data..." | tee -a "$RESULT_FILE" +LINES_GENERATED=0 + +for ((i=0; i<TOTAL_INTERVALS; i++)); do + TIMESTAMP=$((THIRTY_DAYS_AGO + (i * INTERVAL_MS))) + + # Generate a sample for each metric x instance combination + for METRIC in "${METRICS[@]}"; do + for INSTANCE in "${INSTANCES[@]}"; do + VALUE=$((RANDOM % 1000)) + echo "$METRIC,instance=$INSTANCE;env=benchmark,$VALUE,$TIMESTAMP" >> benchmark-data-1gb.csv + LINES_GENERATED=$((LINES_GENERATED + 1)) + done + done + + # Progress indicator every 5000 intervals + if [ $((i % 5000)) -eq 0 ]; then + PROGRESS=$((i * 100 / TOTAL_INTERVALS)) + echo -ne "\rProgress: $PROGRESS% ($LINES_GENERATED lines)" | tee -a "$RESULT_FILE" + fi +done + +echo "" | tee -a "$RESULT_FILE" + +GENERATION_END=$(date +%s) +GENERATION_TIME=$((GENERATION_END - GENERATION_START)) + +# Get actual file size +FILE_SIZE=$(stat -f%z benchmark-data-1gb.csv 2>/dev/null || stat -c%s benchmark-data-1gb.csv 2>/dev/null) +FILE_SIZE_MB=$((FILE_SIZE / 1024 / 1024)) + +echo "" | tee -a "$RESULT_FILE" +echo "Data generation complete:" | tee -a "$RESULT_FILE" +echo " Lines generated: $LINES_GENERATED" | tee -a "$RESULT_FILE" +echo " File size: ${FILE_SIZE_MB}MB ($FILE_SIZE bytes)" | tee -a "$RESULT_FILE" +echo " Generation time: ${GENERATION_TIME}s" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Step 2: Start port-forward to Prometheus +echo "Step 2: Setting up port-forward to Prometheus..." | tee -a "$RESULT_FILE" +kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 > /tmp/benchmark-pf.log 2>&1 & +PF_PID=$! +echo "Port-forward started (PID: $PF_PID)" | tee -a "$RESULT_FILE" +sleep 8 # Wait for port-forward to be ready +echo "" | tee -a "$RESULT_FILE" + +# Step 3: Get baseline Prometheus metrics +echo "Step 3: Collecting baseline Prometheus metrics..." | tee -a "$RESULT_FILE" +PROM_POD=$(kubectl get pod -n monitoring -l app.kubernetes.io/name=prometheus -o jsonpath='{.items[0].metadata.name}') +echo "Prometheus pod: $PROM_POD" | tee -a "$RESULT_FILE" + +# Get memory and CPU usage before ingestion +BASELINE_MEMORY=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $3}') +BASELINE_CPU=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $2}') + +echo " Baseline memory: $BASELINE_MEMORY" | tee -a "$RESULT_FILE" +echo " Baseline CPU: $BASELINE_CPU" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Step 4: Run ingestion benchmark +echo "Step 4: Running ingestion benchmark..." | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +INGEST_START=$(date +%s.%N) + +# Run epimetheus with time measurement +# Use CSV mode with Remote Write API (all data is historic) +# Note: We can't use auto mode because it requires both Pushgateway and Remote Write +# Instead, we'll implement a direct CSV->Remote Write ingestion + +echo "Parsing CSV and preparing for Remote Write ingestion..." | tee -a "$RESULT_FILE" + +# For now, use backfill mode to process the CSV data +# We'll need to enhance epimetheus to support pure CSV->RemoteWrite mode +echo "WARNING: Using auto mode - this may fail if data is too recent" | tee -a "$RESULT_FILE" +echo "Continuing with Remote Write API for historic data..." | tee -a "$RESULT_FILE" + +/usr/bin/time -v ./epimetheus \ + -mode=auto \ + -file=benchmark-data-1gb.csv \ + -format=csv \ + -prometheus=http://localhost:9090/api/v1/write \ + -pushgateway=http://localhost:9091 \ + 2>&1 | tee -a "$RESULT_FILE" || true # Continue even if pushgateway fails + +INGEST_END=$(date +%s.%N) + +# Calculate ingestion time +INGEST_TIME=$(echo "$INGEST_END - $INGEST_START" | bc) + +echo "" | tee -a "$RESULT_FILE" +echo "Ingestion complete:" | tee -a "$RESULT_FILE" +echo " Total time: ${INGEST_TIME}s" | tee -a "$RESULT_FILE" + +# Calculate throughput +SAMPLES_PER_SECOND=$(echo "scale=2; $LINES_GENERATED / $INGEST_TIME" | bc) +MB_PER_SECOND=$(echo "scale=2; $FILE_SIZE_MB / $INGEST_TIME" | bc) + +echo " Samples/second: $SAMPLES_PER_SECOND" | tee -a "$RESULT_FILE" +echo " MB/second: $MB_PER_SECOND" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Step 5: Get post-ingestion Prometheus metrics +echo "Step 5: Collecting post-ingestion Prometheus metrics..." | tee -a "$RESULT_FILE" +sleep 5 # Wait for metrics to stabilize + +POST_MEMORY=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $3}') +POST_CPU=$(kubectl top pod -n monitoring "$PROM_POD" --no-headers | awk '{print $2}') + +echo " Post-ingestion memory: $POST_MEMORY" | tee -a "$RESULT_FILE" +echo " Post-ingestion CPU: $POST_CPU" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Step 6: Query some data to verify ingestion +echo "Step 6: Verifying data ingestion..." | tee -a "$RESULT_FILE" +QUERY_RESULT=$(curl -s "http://localhost:9090/api/v1/query?query=count(epimetheus_benchmark_cpu_usage)" | jq -r '.data.result[0].value[1]') +echo " Samples found for epimetheus_benchmark_cpu_usage: $QUERY_RESULT" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" + +# Step 7: Cleanup +echo "Step 7: Cleaning up..." | tee -a "$RESULT_FILE" +kill $PF_PID 2>/dev/null || true +echo "" | tee -a "$RESULT_FILE" + +# Summary +echo "=== BENCHMARK SUMMARY ===" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" +echo "Dataset:" | tee -a "$RESULT_FILE" +echo " Size: ${FILE_SIZE_MB}MB" | tee -a "$RESULT_FILE" +echo " Samples: $LINES_GENERATED" | tee -a "$RESULT_FILE" +echo " Time range: 30 days" | tee -a "$RESULT_FILE" +echo " Interval: 30 seconds" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" +echo "Performance:" | tee -a "$RESULT_FILE" +echo " Generation time: ${GENERATION_TIME}s" | tee -a "$RESULT_FILE" +echo " Ingestion time: ${INGEST_TIME}s" | tee -a "$RESULT_FILE" +echo " Throughput: $SAMPLES_PER_SECOND samples/s" | tee -a "$RESULT_FILE" +echo " Throughput: $MB_PER_SECOND MB/s" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" +echo "Resources:" | tee -a "$RESULT_FILE" +echo " Memory: $BASELINE_MEMORY -> $POST_MEMORY" | tee -a "$RESULT_FILE" +echo " CPU: $BASELINE_CPU -> $POST_CPU" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" +echo "Results saved to: $RESULT_FILE" | tee -a "$RESULT_FILE" +echo "" | tee -a "$RESULT_FILE" +echo "To view results: cat $RESULT_FILE" +echo "To analyze: less $RESULT_FILE" diff --git a/cleanup-benchmark-data.sh b/cleanup-benchmark-data.sh new file mode 100755 index 0000000..a5409f1 --- /dev/null +++ b/cleanup-benchmark-data.sh @@ -0,0 +1,88 @@ +#!/bin/bash +# Cleanup script: Delete benchmark data from Prometheus +# This uses the Prometheus Admin API to selectively remove benchmark metrics + +set -e + +PROMETHEUS_URL="${1:-http://localhost:9090}" + +echo "=== Prometheus Benchmark Data Cleanup ===" +echo "" +echo "Prometheus URL: $PROMETHEUS_URL" +echo "" + +# Check if port-forward is needed +if [[ "$PROMETHEUS_URL" == *"localhost"* ]]; then + echo "Note: Make sure you have port-forward running:" + echo " kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090" + echo "" +fi + +# Metrics to delete +METRICS=( + "epimetheus_benchmark_cpu_usage" + "epimetheus_benchmark_memory_bytes" + "epimetheus_benchmark_disk_io_bytes" + "epimetheus_benchmark_network_rx_bytes" + "epimetheus_benchmark_network_tx_bytes" + "epimetheus_benchmark_requests_total" + "epimetheus_benchmark_errors_total" + "epimetheus_benchmark_response_time_ms" + "epimetheus_benchmark_active_connections" + "epimetheus_benchmark_queue_depth" +) + +echo "Step 1: Deleting benchmark metrics..." +echo "" + +SUCCESS_COUNT=0 +ERROR_COUNT=0 + +for METRIC in "${METRICS[@]}"; do + echo " Deleting: $METRIC" + + # Delete series endpoint returns HTTP 204 No Content on success + HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST -g "${PROMETHEUS_URL}/api/v1/admin/tsdb/delete_series?match[]=${METRIC}") + + if [ "$HTTP_CODE" == "204" ] || [ "$HTTP_CODE" == "200" ]; then + echo " ✅ Success (HTTP $HTTP_CODE)" + SUCCESS_COUNT=$((SUCCESS_COUNT + 1)) + else + echo " ❌ Error: HTTP $HTTP_CODE" + ERROR_COUNT=$((ERROR_COUNT + 1)) + fi +done + +echo "" +echo "Deletion summary: $SUCCESS_COUNT succeeded, $ERROR_COUNT failed" +echo "" + +if [ $ERROR_COUNT -eq 0 ]; then + echo "Step 2: Cleaning up tombstones..." + echo "" + + # Clean tombstones endpoint returns HTTP 204 No Content on success + CLEANUP_HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "${PROMETHEUS_URL}/api/v1/admin/tsdb/clean_tombstones") + + if [ "$CLEANUP_HTTP_CODE" == "204" ] || [ "$CLEANUP_HTTP_CODE" == "200" ]; then + echo " ✅ Tombstones cleaned successfully (HTTP $CLEANUP_HTTP_CODE)" + echo "" + echo "🎉 Cleanup complete!" + echo "" + echo "Note: Prometheus may take a few moments to compact the database" + echo "and free up disk space." + else + echo " ❌ Error cleaning tombstones: HTTP $CLEANUP_HTTP_CODE" + exit 1 + fi +else + echo "⚠️ Some deletions failed. Skipping tombstone cleanup." + echo "Check Prometheus admin API is enabled with:" + echo " kubectl get prometheus -n monitoring prometheus-kube-prometheus-prometheus -o yaml | grep -A5 additionalArgs" + exit 1 +fi + +echo "" +echo "To verify deletion, run:" +echo " curl -s '${PROMETHEUS_URL}/api/v1/label/__name__/values' | jq '.data | map(select(startswith(\"epimetheus_benchmark\")))'" +echo "" diff --git a/cleanup-benchmark-metrics.sh b/cleanup-benchmark-metrics.sh new file mode 100755 index 0000000..d70aa95 --- /dev/null +++ b/cleanup-benchmark-metrics.sh @@ -0,0 +1,83 @@ +#!/bin/bash +# Cleanup benchmark metrics from Prometheus +# This allows running benchmarks from a clean state + +set -e + +echo "=== Prometheus Benchmark Metrics Cleanup ===" +echo "" + +# Port-forward to Prometheus +echo "Setting up port-forward to Prometheus..." +kubectl port-forward -n monitoring svc/prometheus-kube-prometheus-prometheus 9090:9090 > /tmp/cleanup-pf.log 2>&1 & +PF_PID=$! +echo "Port-forward started (PID: $PF_PID)" +sleep 5 + +# Check if Admin API is enabled +echo "" +echo "Checking if Prometheus Admin API is enabled..." +ADMIN_CHECK=$(curl -s -o /dev/null -w "%{http_code}" -X POST "http://localhost:9090/api/v1/admin/tsdb/delete_series?match[]=epimetheus_benchmark_cpu_usage") + +if [ "$ADMIN_CHECK" = "204" ] || [ "$ADMIN_CHECK" = "200" ]; then + echo "✅ Admin API is enabled" + echo "" + echo "Deleting benchmark metrics..." + + # Delete all benchmark metrics + METRICS=( + "epimetheus_benchmark_cpu_usage" + "epimetheus_benchmark_memory_bytes" + "epimetheus_benchmark_disk_io_bytes" + "epimetheus_benchmark_network_rx_bytes" + "epimetheus_benchmark_network_tx_bytes" + "epimetheus_benchmark_requests_total" + "epimetheus_benchmark_errors_total" + "epimetheus_benchmark_response_time_ms" + "epimetheus_benchmark_active_connections" + "epimetheus_benchmark_queue_depth" + ) + + for METRIC in "${METRICS[@]}"; do + echo " Deleting: $METRIC" + curl -s -X POST "http://localhost:9090/api/v1/admin/tsdb/delete_series?match[]=$METRIC" > /dev/null + done + + echo "" + echo "Triggering tombstone cleanup (this removes deleted data from disk)..." + curl -s -X POST "http://localhost:9090/api/v1/admin/tsdb/clean_tombstones" > /dev/null + + echo "" + echo "✅ Cleanup complete!" +elif [ "$ADMIN_CHECK" = "405" ]; then + echo "❌ Admin API is NOT enabled" + echo "" + echo "To enable the Admin API, update your Prometheus configuration:" + echo "" + echo "In f3s/prometheus/persistence-values.yaml, add:" + echo "" + echo "prometheus:" + echo " prometheusSpec:" + echo " additionalArgs:" + echo " - name: web.enable-admin-api" + echo " value: \"\"" + echo "" + echo "Then upgrade Prometheus:" + echo " cd /home/paul/git/conf/f3s/prometheus" + echo " just upgrade" + echo "" + echo "WARNING: Admin API should only be enabled in development/test environments!" + echo "" + echo "Alternative: Delete benchmark data files manually:" + echo " kubectl exec -n monitoring prometheus-prometheus-kube-prometheus-prometheus-0 -- sh -c 'rm -rf /prometheus/data/wal/*'" + echo " kubectl delete pod -n monitoring prometheus-prometheus-kube-prometheus-prometheus-0" +else + echo "⚠️ Unexpected response: HTTP $ADMIN_CHECK" +fi + +echo "" +echo "Cleaning up port-forward..." +kill $PF_PID 2>/dev/null || true + +echo "" +echo "Done!" diff --git a/generate-test-data.sh b/generate-test-data.sh new file mode 100755 index 0000000..a4a0b1b --- /dev/null +++ b/generate-test-data.sh @@ -0,0 +1,46 @@ +#!/bin/bash + +# Generate test data with actual timestamps for different time ranges + +NOW=$(date +%s)000 # Current time in milliseconds +ONE_HOUR_AGO=$((NOW - 3600000)) +ONE_DAY_AGO=$((NOW - 86400000)) +ONE_WEEK_AGO=$((NOW - 604800000)) +ONE_MONTH_AGO=$((NOW - 2592000000)) + +cat > test-all-ages.csv << EOF +# Prometheus metrics in CSV format demonstrating all time ranges +# Format: metric_name,labels,value,timestamp_ms + +# CURRENT data (< 5min old - will use Pushgateway/Realtime) +app_requests_total,instance=current;env=prod,100,$NOW +app_temperature_celsius,instance=current;zone=us-east,22.5,$NOW +app_active_connections,instance=current;env=prod,50,$NOW + +# 1 HOUR OLD data (will use Remote Write/Historic) +app_requests_total,instance=1h_ago;env=prod,95,$ONE_HOUR_AGO +app_active_connections,instance=1h_ago;env=prod,45,$ONE_HOUR_AGO +app_temperature_celsius,instance=1h_ago;zone=us-east,21.8,$ONE_HOUR_AGO + +# 1 DAY OLD data (will use Remote Write/Historic) +app_requests_total,instance=1d_ago;env=prod,150,$ONE_DAY_AGO +app_temperature_celsius,instance=1d_ago;zone=eu-west,18.3,$ONE_DAY_AGO +app_active_connections,instance=1d_ago;env=prod,60,$ONE_DAY_AGO + +# 1 WEEK OLD data (will use Remote Write/Historic) +app_requests_total,instance=1w_ago;env=prod,200,$ONE_WEEK_AGO +app_jobs_processed_total,instance=1w_ago;env=prod;job_type=email;status=success,75,$ONE_WEEK_AGO +app_temperature_celsius,instance=1w_ago;zone=asia,25.2,$ONE_WEEK_AGO + +# 1 MONTH OLD data (will use Remote Write/Historic) +app_requests_total,instance=1m_ago;env=prod,180,$ONE_MONTH_AGO +app_active_connections,instance=1m_ago;env=prod,30,$ONE_MONTH_AGO +app_temperature_celsius,instance=1m_ago;zone=africa,28.7,$ONE_MONTH_AGO +EOF + +echo "Generated test-all-ages.csv with the following timestamps:" +echo " Current: $NOW ($(date -d @$((NOW/1000)) '+%Y-%m-%d %H:%M:%S'))" +echo " 1h ago: $ONE_HOUR_AGO ($(date -d @$((ONE_HOUR_AGO/1000)) '+%Y-%m-%d %H:%M:%S'))" +echo " 1d ago: $ONE_DAY_AGO ($(date -d @$((ONE_DAY_AGO/1000)) '+%Y-%m-%d %H:%M:%S'))" +echo " 1w ago: $ONE_WEEK_AGO ($(date -d @$((ONE_WEEK_AGO/1000)) '+%Y-%m-%d %H:%M:%S'))" +echo " 1m ago: $ONE_MONTH_AGO ($(date -d @$((ONE_MONTH_AGO/1000)) '+%Y-%m-%d %H:%M:%S'))" @@ -0,0 +1,26 @@ +module epimetheus + +go 1.24.0 + +require ( + github.com/golang/snappy v1.0.0 + github.com/magefile/mage v1.15.0 + github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/prometheus v0.308.1 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.67.4 // indirect + github.com/prometheus/procfs v0.16.1 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect + golang.org/x/sys v0.37.0 // indirect + golang.org/x/text v0.30.0 // indirect + google.golang.org/protobuf v1.36.10 // indirect +) @@ -0,0 +1,83 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853 h1:cLN4IBkmkYZNnk7EAJ0BHIethd+J6LqxFNw5mSiI2bM= +github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= +github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.67.4 h1:yR3NqWO1/UyO1w2PhUvXlGQs/PtFmoveVO0KZ4+Lvsc= +github.com/prometheus/common v0.67.4/go.mod h1:gP0fq6YjjNCLssJCQp0yk4M8W6ikLURwkdd/YKtTbyI= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/prometheus/prometheus v0.308.1 h1:ApMNI/3/es3Ze90Z7CMb+wwU2BsSYur0m5VKeqHj7h4= +github.com/prometheus/prometheus v0.308.1/go.mod h1:aHjYCDz9zKRyoUXvMWvu13K9XHOkBB12XrEqibs3e0A= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= +go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= +golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..dea804b --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,57 @@ +package config + +import "time" + +// Mode represents the ingestion mode +type Mode string + +const ( + ModeRealtime Mode = "realtime" + ModeHistoric Mode = "historic" + ModeBackfill Mode = "backfill" + ModeAuto Mode = "auto" + ModeWatch Mode = "watch" +) + +// Config holds all configuration for the prometheus-pusher +type Config struct { + Mode Mode + PushgatewayURL string + PrometheusURL string + ClickHouseURL string // ClickHouse HTTP URL (e.g. http://localhost:8123) + ClickHouseTable string // ClickHouse table name (default: epimetheus_metrics) + JobName string + Continuous bool + InputFile string + InputFormat string + MetricName string + HoursAgo int + StartHours int + EndHours int + Interval int + ResolveIPLabels []string // Labels containing IP addresses to resolve via DNS +} + +// NewConfig creates a new Config with default values +func NewConfig() Config { + return Config{ + Mode: ModeRealtime, + PushgatewayURL: "http://localhost:9091", + PrometheusURL: "http://localhost:9090/api/v1/write", + ClickHouseURL: "", + ClickHouseTable: "epimetheus_metrics", + JobName: "example_metrics_pusher", + InputFormat: "csv", + HoursAgo: 24, + StartHours: 48, + EndHours: 0, + Interval: 1, + ResolveIPLabels: []string{"ip"}, // Default resolves 'ip' label + } +} + +// AutoIngestThreshold is the age threshold for auto mode routing +const AutoIngestThreshold = 5 * time.Minute + +// DefaultHTTPTimeout is the default timeout for HTTP requests +const DefaultHTTPTimeout = 10 * time.Second diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..da073c4 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,52 @@ +package config + +import ( + "testing" + "time" +) + +func TestNewConfig(t *testing.T) { + cfg := NewConfig() + + if cfg.Mode != ModeRealtime { + t.Errorf("Default mode = %v, want %v", cfg.Mode, ModeRealtime) + } + if cfg.PushgatewayURL != "http://localhost:9091" { + t.Errorf("Default PushgatewayURL = %v, want http://localhost:9091", cfg.PushgatewayURL) + } + if cfg.PrometheusURL != "http://localhost:9090/api/v1/write" { + t.Errorf("Default PrometheusURL = %v, want http://localhost:9090/api/v1/write", cfg.PrometheusURL) + } + if cfg.JobName != "example_metrics_pusher" { + t.Errorf("Default JobName = %v, want example_metrics_pusher", cfg.JobName) + } + if cfg.InputFormat != "csv" { + t.Errorf("Default InputFormat = %v, want csv", cfg.InputFormat) + } + if cfg.HoursAgo != 24 { + t.Errorf("Default HoursAgo = %v, want 24", cfg.HoursAgo) + } + if cfg.Interval != 1 { + t.Errorf("Default Interval = %v, want 1", cfg.Interval) + } +} + +func TestModeConstants(t *testing.T) { + modes := []Mode{ModeRealtime, ModeHistoric, ModeBackfill, ModeAuto} + expected := []string{"realtime", "historic", "backfill", "auto"} + + for i, mode := range modes { + if string(mode) != expected[i] { + t.Errorf("Mode constant %d = %v, want %v", i, mode, expected[i]) + } + } +} + +func TestConstants(t *testing.T) { + if AutoIngestThreshold != 5*time.Minute { + t.Errorf("AutoIngestThreshold = %v, want 5m", AutoIngestThreshold) + } + if DefaultHTTPTimeout != 10*time.Second { + t.Errorf("DefaultHTTPTimeout = %v, want 10s", DefaultHTTPTimeout) + } +} diff --git a/internal/ingester/auto.go b/internal/ingester/auto.go new file mode 100644 index 0000000..315d767 --- /dev/null +++ b/internal/ingester/auto.go @@ -0,0 +1,145 @@ +package ingester + +import ( + "context" + "fmt" + "log" + "time" + + "epimetheus/internal/config" + "epimetheus/internal/metrics" +) + +const ageThreshold = 5 * time.Minute + +// DetermineMode automatically determines which ingestion mode to use based on timestamp age. +// Data older than 5 minutes uses historic mode (Remote Write), newer data uses realtime mode (Pushgateway). +func DetermineMode(timestamp time.Time) config.Mode { + age := time.Since(timestamp) + if age > ageThreshold { + return config.ModeHistoric + } + return config.ModeRealtime +} + +// AutoIngester handles automatic ingestion by routing samples to appropriate ingesters. +type AutoIngester struct { + pushgateway PushgatewayIngester + remoteWrite RemoteWriteIngester + collectors metrics.Collectors +} + +// NewAutoIngester creates a new auto ingester. +func NewAutoIngester(collectors metrics.Collectors) AutoIngester { + return AutoIngester{ + pushgateway: NewPushgatewayIngester(), + remoteWrite: NewRemoteWriteIngester(), + collectors: collectors, + } +} + +// Ingest automatically routes samples to appropriate ingestion method based on timestamp age. +func (a AutoIngester) Ingest(ctx context.Context, samples []metrics.Sample, cfg config.Config) error { + if len(samples) == 0 { + return fmt.Errorf("no samples to ingest") + } + + realtimeSamples, historicSamples := groupSamplesByMode(samples) + + logIngestSummary(len(samples), len(realtimeSamples), len(historicSamples)) + + if len(realtimeSamples) > 0 { + if err := a.ingestRealtime(ctx, cfg); err != nil { + return fmt.Errorf("failed to ingest realtime samples: %w", err) + } + } + + if len(historicSamples) > 0 { + if err := a.ingestHistoric(ctx, historicSamples, cfg); err != nil { + return fmt.Errorf("failed to ingest historic samples: %w", err) + } + } + + log.Printf("\n🎉 Auto-ingest complete!") + return nil +} + +// groupSamplesByMode separates samples into realtime and historic groups. +func groupSamplesByMode(samples []metrics.Sample) (realtime, historic []metrics.Sample) { + realtimeSamples := make([]metrics.Sample, 0) + historicSamples := make([]metrics.Sample, 0) + + for _, sample := range samples { + if DetermineMode(sample.Timestamp) == config.ModeRealtime { + realtimeSamples = append(realtimeSamples, sample) + } else { + historicSamples = append(historicSamples, sample) + } + } + + return realtimeSamples, historicSamples +} + +// logIngestSummary logs the ingestion summary. +func logIngestSummary(total, realtime, historic int) { + log.Printf("📊 Auto-ingest summary:") + log.Printf(" Total samples: %d", total) + log.Printf(" Realtime samples (< 5min old): %d", realtime) + log.Printf(" Historic samples (> 5min old): %d", historic) +} + +// ingestRealtime ingests realtime samples via Pushgateway. +func (a AutoIngester) ingestRealtime(ctx context.Context, cfg config.Config) error { + log.Printf("\n🔄 Ingesting REALTIME samples via Pushgateway...") + log.Printf(" Note: Pushgateway uses current timestamp (original timestamps ignored)") + + if err := a.pushgateway.Ingest(ctx, a.collectors, cfg.PushgatewayURL, cfg.JobName); err != nil { + return err + } + + log.Printf("✅ Successfully ingested realtime samples") + return nil +} + +// ingestHistoric ingests historic samples via Remote Write. +func (a AutoIngester) ingestHistoric(ctx context.Context, samples []metrics.Sample, cfg config.Config) error { + log.Printf("\n⏰ Ingesting %d HISTORIC samples via Remote Write...", len(samples)) + + // Log a few sample details instead of all samples + samplesToLog := 3 + if len(samples) < samplesToLog { + samplesToLog = len(samples) + } + + for i := 0; i < samplesToLog; i++ { + age := time.Since(samples[i].Timestamp) + log.Printf(" Sample %d: %s (age: %s)", i+1, samples[i].MetricName, formatDuration(age)) + } + + if len(samples) > samplesToLog { + // Show oldest and newest sample ages + oldestAge := time.Since(samples[0].Timestamp) + newestAge := time.Since(samples[len(samples)-1].Timestamp) + log.Printf(" ... (%d more samples)", len(samples)-samplesToLog) + log.Printf(" Age range: %s (oldest) to %s (newest)", formatDuration(oldestAge), formatDuration(newestAge)) + } + + if err := a.remoteWrite.Ingest(ctx, samples, cfg.PrometheusURL); err != nil { + return err + } + + log.Printf("✅ Successfully ingested %d historic samples", len(samples)) + return nil +} + +// formatDuration formats a duration in human-readable form. +func formatDuration(d time.Duration) string { + if d < time.Minute { + return fmt.Sprintf("%.0f seconds", d.Seconds()) + } else if d < time.Hour { + return fmt.Sprintf("%.0f minutes", d.Minutes()) + } else if d < 24*time.Hour { + return fmt.Sprintf("%.1f hours", d.Hours()) + } + return fmt.Sprintf("%.1f days", d.Hours()/24) +} diff --git a/internal/ingester/auto_test.go b/internal/ingester/auto_test.go new file mode 100644 index 0000000..fc1c423 --- /dev/null +++ b/internal/ingester/auto_test.go @@ -0,0 +1,164 @@ +package ingester + +import ( + "context" + "testing" + "time" + + "epimetheus/internal/config" + "epimetheus/internal/metrics" +) + +func TestDetermineMode(t *testing.T) { + tests := []struct { + name string + timestamp time.Time + want config.Mode + }{ + { + name: "current time is realtime", + timestamp: time.Now(), + want: config.ModeRealtime, + }, + { + name: "1 minute ago is realtime", + timestamp: time.Now().Add(-1 * time.Minute), + want: config.ModeRealtime, + }, + { + name: "4 minutes ago is realtime", + timestamp: time.Now().Add(-4 * time.Minute), + want: config.ModeRealtime, + }, + { + name: "6 minutes ago is historic", + timestamp: time.Now().Add(-6 * time.Minute), + want: config.ModeHistoric, + }, + { + name: "1 hour ago is historic", + timestamp: time.Now().Add(-1 * time.Hour), + want: config.ModeHistoric, + }, + { + name: "1 day ago is historic", + timestamp: time.Now().Add(-24 * time.Hour), + want: config.ModeHistoric, + }, + { + name: "exactly 5 minutes is historic (edge case)", + timestamp: time.Now().Add(-5 * time.Minute), + want: config.ModeHistoric, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := DetermineMode(tt.timestamp) + if got != tt.want { + age := time.Since(tt.timestamp) + t.Errorf("DetermineMode() = %v, want %v (age: %v)", got, tt.want, age) + } + }) + } +} + +func TestGroupSamplesByMode(t *testing.T) { + now := time.Now() + samples := []metrics.Sample{ + {MetricName: "metric1", Timestamp: now.Add(-1 * time.Minute)}, // realtime + {MetricName: "metric2", Timestamp: now.Add(-2 * time.Minute)}, // realtime + {MetricName: "metric3", Timestamp: now.Add(-10 * time.Minute)}, // historic + {MetricName: "metric4", Timestamp: now.Add(-1 * time.Hour)}, // historic + {MetricName: "metric5", Timestamp: now.Add(-30 * time.Second)}, // realtime + } + + realtime, historic := groupSamplesByMode(samples) + + if len(realtime) != 3 { + t.Errorf("Got %d realtime samples, want 3", len(realtime)) + } + if len(historic) != 2 { + t.Errorf("Got %d historic samples, want 2", len(historic)) + } + + // Verify correct grouping + for _, s := range realtime { + if DetermineMode(s.Timestamp) != config.ModeRealtime { + t.Errorf("Sample %s incorrectly grouped as realtime (age: %v)", s.MetricName, s.Age()) + } + } + for _, s := range historic { + if DetermineMode(s.Timestamp) != config.ModeHistoric { + t.Errorf("Sample %s incorrectly grouped as historic (age: %v)", s.MetricName, s.Age()) + } + } +} + +func TestFormatDuration(t *testing.T) { + tests := []struct { + name string + duration time.Duration + want string + }{ + { + name: "seconds", + duration: 45 * time.Second, + want: "45 seconds", + }, + { + name: "minutes", + duration: 5 * time.Minute, + want: "5 minutes", + }, + { + name: "hours", + duration: 2*time.Hour + 30*time.Minute, + want: "2.5 hours", + }, + { + name: "days", + duration: 36 * time.Hour, + want: "1.5 days", + }, + { + name: "less than minute", + duration: 30 * time.Second, + want: "30 seconds", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := formatDuration(tt.duration) + if got != tt.want { + t.Errorf("formatDuration() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestAutoIngester_Ingest_EmptySamples(t *testing.T) { + collectors := metrics.NewCollectors() + autoIngester := NewAutoIngester(collectors) + ctx := context.Background() + cfg := config.NewConfig() + + err := autoIngester.Ingest(ctx, []metrics.Sample{}, cfg) + if err == nil { + t.Error("Expected error for empty samples, got nil") + } + if err.Error() != "no samples to ingest" { + t.Errorf("Expected 'no samples to ingest' error, got: %v", err) + } +} + +func TestAutoIngester_New(t *testing.T) { + collectors := metrics.NewCollectors() + ingester := NewAutoIngester(collectors) + + // Verify ingester was created with components + if ingester.collectors.RequestsTotal == nil { + t.Error("AutoIngester.collectors not initialized properly") + } +} diff --git a/internal/ingester/clickhouse.go b/internal/ingester/clickhouse.go new file mode 100644 index 0000000..2439d4e --- /dev/null +++ b/internal/ingester/clickhouse.go @@ -0,0 +1,191 @@ +package ingester + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "strings" + "sync" + "time" + + "epimetheus/internal/metrics" +) + +const ( + clickhouseBatchSize = 10000 + clickhouseHTTPTimeout = 30 * time.Second + defaultTableName = "epimetheus_metrics" +) + +// ClickHouseIngester ingests metrics into ClickHouse. +type ClickHouseIngester struct { + client *http.Client + baseURL string + tableName string +} + +// NewClickHouseIngester creates a new ClickHouse ingester. +func NewClickHouseIngester(baseURL, tableName string) ClickHouseIngester { + if tableName == "" { + tableName = defaultTableName + } + // Ensure URL has scheme and no trailing slash for query params + baseURL = strings.TrimSuffix(strings.TrimSpace(baseURL), "/") + + return ClickHouseIngester{ + client: &http.Client{ + Timeout: clickhouseHTTPTimeout, + }, + baseURL: baseURL, + tableName: tableName, + } +} + +// EnsureTable creates the metrics table if it does not exist. +func (c ClickHouseIngester) EnsureTable(ctx context.Context) error { + createSQL := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + metric String, + labels Map(String, String), + value Float64, + timestamp DateTime64(3) + ) ENGINE = MergeTree() + ORDER BY (metric, timestamp) + `, c.tableName) + + reqURL := c.baseURL + "/?query=" + url.QueryEscape(createSQL) + req, err := http.NewRequestWithContext(ctx, "POST", reqURL, nil) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("execute create table: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("create table failed (status %d): %s", resp.StatusCode, string(body)) + } + + return nil +} + +// Ingest inserts samples into ClickHouse in batches. +func (c ClickHouseIngester) Ingest(ctx context.Context, samples []metrics.Sample) error { + if len(samples) == 0 { + return fmt.Errorf("no samples to ingest") + } + + if err := c.EnsureTable(ctx); err != nil { + return fmt.Errorf("ensure table: %w", err) + } + + batches := chunkSamplesForClickHouse(samples, clickhouseBatchSize) + totalBatches := len(batches) + + log.Printf("ClickHouse: ingesting %d samples in %d batches", len(samples), totalBatches) + + var wg sync.WaitGroup + errChan := make(chan error, totalBatches) + + for i, batch := range batches { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + wg.Add(1) + go func(idx int, b []metrics.Sample) { + defer wg.Done() + if err := c.insertBatch(ctx, b); err != nil { + errChan <- fmt.Errorf("batch %d: %w", idx+1, err) + } + }(i, batch) + } + + wg.Wait() + close(errChan) + + var firstErr error + for err := range errChan { + if firstErr == nil { + firstErr = err + } + log.Printf("ClickHouse batch error: %v", err) + } + + return firstErr +} + +// insertBatch sends a single batch via HTTP JSONEachRow. +func (c ClickHouseIngester) insertBatch(ctx context.Context, samples []metrics.Sample) error { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + + for _, s := range samples { + // Convert labels map to format ClickHouse Map expects + labelsMap := make(map[string]string, len(s.Labels)) + for k, v := range s.Labels { + labelsMap[k] = v + } + + row := struct { + Metric string `json:"metric"` + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` + Timestamp string `json:"timestamp"` + }{ + Metric: s.MetricName, + Labels: labelsMap, + Value: s.Value, + Timestamp: s.Timestamp.UTC().Format("2006-01-02 15:04:05.000"), + } + + if err := enc.Encode(row); err != nil { + return fmt.Errorf("encode row: %w", err) + } + } + + query := fmt.Sprintf("INSERT INTO %s FORMAT JSONEachRow", c.tableName) + reqURL := c.baseURL + "/?query=" + url.QueryEscape(query) + + req, err := http.NewRequestWithContext(ctx, "POST", reqURL, &buf) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("insert failed (status %d): %s", resp.StatusCode, string(body)) + } + + return nil +} + +func chunkSamplesForClickHouse(samples []metrics.Sample, size int) [][]metrics.Sample { + var batches [][]metrics.Sample + for i := 0; i < len(samples); i += size { + end := i + size + if end > len(samples) { + end = len(samples) + } + batches = append(batches, samples[i:end]) + } + return batches +} diff --git a/internal/ingester/pushgateway.go b/internal/ingester/pushgateway.go new file mode 100644 index 0000000..c5c80c3 --- /dev/null +++ b/internal/ingester/pushgateway.go @@ -0,0 +1,51 @@ +package ingester + +import ( + "context" + "fmt" + + "epimetheus/internal/metrics" + + "github.com/prometheus/client_golang/prometheus/push" +) + +// PushgatewayIngester handles realtime metric ingestion via Pushgateway. +// Note: Pushgateway does not preserve custom timestamps - all metrics are +// timestamped with the current time when pushed. +type PushgatewayIngester struct{} + +// NewPushgatewayIngester creates a new Pushgateway ingester. +func NewPushgatewayIngester() PushgatewayIngester { + return PushgatewayIngester{} +} + +// Ingest pushes metrics to Pushgateway. +// The samples parameter is currently ignored because Pushgateway doesn't support +// custom metric values from samples - it uses registered Prometheus collectors. +// This ingests generated metrics using the provided collectors. +func (i PushgatewayIngester) Ingest(ctx context.Context, collectors metrics.Collectors, url, jobName string) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Generate random metric values + collectors.Simulate() + + // Create pusher with all collectors + pusher := push.New(url, jobName). + Collector(collectors.RequestsTotal). + Collector(collectors.ActiveConnections). + Collector(collectors.TemperatureCelsius). + Collector(collectors.RequestDuration). + Collector(collectors.JobsProcessed). + Grouping("instance", "example-app") + + // Push metrics to Pushgateway + if err := pusher.Push(); err != nil { + return fmt.Errorf("failed to push to pushgateway: %w", err) + } + + return nil +} diff --git a/internal/ingester/pushgateway_test.go b/internal/ingester/pushgateway_test.go new file mode 100644 index 0000000..55fc213 --- /dev/null +++ b/internal/ingester/pushgateway_test.go @@ -0,0 +1,28 @@ +package ingester + +import ( + "testing" + + "epimetheus/internal/metrics" +) + +func TestNewPushgatewayIngester(t *testing.T) { + ingester := NewPushgatewayIngester() + + // Verify the ingester was created (value type, so no nil check needed) + _ = ingester +} + +func TestPushgatewayIngester_Type(t *testing.T) { + // Test that we can create and use the ingester + collectors := metrics.NewCollectors() + ingester := NewPushgatewayIngester() + + // The ingester should work with collectors + if collectors.RequestsTotal == nil { + t.Error("Collectors not initialized properly") + } + + // Verify ingester is the correct type + _ = ingester +} diff --git a/internal/ingester/remotewrite.go b/internal/ingester/remotewrite.go new file mode 100644 index 0000000..b88b7fa --- /dev/null +++ b/internal/ingester/remotewrite.go @@ -0,0 +1,455 @@ +package ingester + +import ( + "bytes" + "context" + "fmt" + "io" + "log" + "math/rand" + "net/http" + "sync" + "sync/atomic" + "time" + + "epimetheus/internal/metrics" + + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" +) + +const ( + requestTimeout = 10 * time.Second + backfillDelay = 100 * time.Millisecond + // BatchSize defines the maximum number of samples per Remote Write request + // Prometheus has a 32MB limit, so we keep batches small to avoid rejection + BatchSize = 5000 + // NumWorkers defines the number of concurrent goroutines for batch processing + // Higher values increase throughput but may overwhelm Prometheus + NumWorkers = 10 +) + +// Buffer pool for reusing buffers across requests (reduces GC pressure) +var bufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + +// Preallocated buffer pool for protobuf marshaling +var protoBufferPool = sync.Pool{ + New: func() interface{} { + // Preallocate ~500KB buffer (typical batch size) + buf := make([]byte, 0, 512*1024) + return &buf + }, +} + +// TimeSeries object pool for reuse +var timeSeriesPool = sync.Pool{ + New: func() interface{} { + return &prompb.TimeSeries{ + Labels: make([]prompb.Label, 0, 10), // Typical label count + Samples: make([]prompb.Sample, 0, 1), + } + }, +} + +// Label slice pool for reuse +var labelSlicePool = sync.Pool{ + New: func() interface{} { + labels := make([]prompb.Label, 0, 10) + return &labels + }, +} + +// RemoteWriteIngester handles historic metric ingestion via Prometheus Remote Write API. +// This ingester preserves custom timestamps, making it suitable for importing historic data. +type RemoteWriteIngester struct { + client *http.Client +} + +// NewRemoteWriteIngester creates a new Remote Write ingester with optimized HTTP client. +func NewRemoteWriteIngester() RemoteWriteIngester { + // Optimized HTTP transport with connection pooling + transport := &http.Transport{ + MaxIdleConns: 100, // Global connection pool + MaxIdleConnsPerHost: NumWorkers, // Match worker count + MaxConnsPerHost: NumWorkers, // Limit concurrent connections per host + IdleConnTimeout: 90 * time.Second, // Keep connections alive longer + DisableKeepAlives: false, // CRITICAL: enable keep-alive + DisableCompression: true, // We compress manually with Snappy + ForceAttemptHTTP2: true, // Use HTTP/2 if available + WriteBufferSize: 64 * 1024, // 64KB write buffer + ReadBufferSize: 64 * 1024, // 64KB read buffer + } + + return RemoteWriteIngester{ + client: &http.Client{ + Timeout: requestTimeout, + Transport: transport, + }, + } +} + +// Ingest sends samples to Prometheus via Remote Write API with batching and concurrency. +// Large datasets are automatically split into batches and processed by a worker pool +// to avoid exceeding Prometheus's 32MB limit and maximize throughput. +func (i RemoteWriteIngester) Ingest(ctx context.Context, samples []metrics.Sample, url string) error { + if len(samples) == 0 { + return fmt.Errorf("no samples to ingest") + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Split samples into batches + batches := chunkSamples(samples, BatchSize) + totalBatches := len(batches) + + log.Printf("Splitting %d samples into %d batches (batch size: %d)", len(samples), totalBatches, BatchSize) + log.Printf("Processing batches with %d concurrent workers", NumWorkers) + + // Counters for tracking progress (atomic for thread safety) + var successCount int32 + var errorCount int32 + var processedCount int32 + + // Worker pool pattern + batchChan := make(chan batchJob, totalBatches) + errorsChan := make(chan error, totalBatches) + var wg sync.WaitGroup + + // Start workers + for w := 0; w < NumWorkers; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + for job := range batchChan { + // Check context before processing + select { + case <-ctx.Done(): + errorsChan <- fmt.Errorf("worker %d cancelled", workerID) + return + default: + } + + // Convert and send batch + timeSeries := convertSamplesToTimeSeries(job.batch) + writeRequest := &prompb.WriteRequest{Timeseries: timeSeries} + + if err := i.sendWriteRequest(ctx, url, writeRequest); err != nil { + atomic.AddInt32(&errorCount, 1) + errorsChan <- fmt.Errorf("batch %d: %w", job.index, err) + } else { + atomic.AddInt32(&successCount, 1) + } + + // Update progress + processed := atomic.AddInt32(&processedCount, 1) + if processed%10 == 0 || int(processed) == totalBatches { + progress := float64(processed) / float64(totalBatches) * 100 + log.Printf("Progress: %.1f%% (%d/%d batches, %d success, %d errors)", + progress, processed, totalBatches, atomic.LoadInt32(&successCount), atomic.LoadInt32(&errorCount)) + } + } + }(w) + } + + // Send batches to workers + for idx, batch := range batches { + batchChan <- batchJob{index: idx + 1, batch: batch} + } + close(batchChan) + + // Wait for all workers to finish + wg.Wait() + close(errorsChan) + + // Collect errors + var firstError error + errorList := make([]error, 0) + for err := range errorsChan { + errorList = append(errorList, err) + if firstError == nil { + firstError = err + } + } + + finalSuccess := int(atomic.LoadInt32(&successCount)) + finalErrors := int(atomic.LoadInt32(&errorCount)) + + log.Printf("Batch ingestion complete: %d successful, %d errors", finalSuccess, finalErrors) + + if finalErrors > 0 { + // Log first few errors as examples + numToLog := 5 + if len(errorList) < numToLog { + numToLog = len(errorList) + } + log.Printf("Sample errors (showing %d of %d):", numToLog, finalErrors) + for i := 0; i < numToLog; i++ { + log.Printf(" - %v", errorList[i]) + } + return fmt.Errorf("completed with %d/%d batches failed", finalErrors, totalBatches) + } + + return nil +} + +// batchJob represents a batch to be processed by a worker. +type batchJob struct { + index int + batch []metrics.Sample +} + +// chunkSamples splits samples into batches of the specified size. +func chunkSamples(samples []metrics.Sample, batchSize int) [][]metrics.Sample { + var batches [][]metrics.Sample + + for i := 0; i < len(samples); i += batchSize { + end := i + batchSize + if end > len(samples) { + end = len(samples) + } + batches = append(batches, samples[i:end]) + } + + return batches +} + +// IngestHistoric generates and ingests historic metrics for a specific time in the past. +func (i RemoteWriteIngester) IngestHistoric(ctx context.Context, url string, hoursAgo int) error { + timestamp := time.Now().Add(-time.Duration(hoursAgo) * time.Hour) + timeSeries := generateHistoricTimeSeries(timestamp) + writeRequest := &prompb.WriteRequest{Timeseries: timeSeries} + + if err := i.sendWriteRequest(ctx, url, writeRequest); err != nil { + return err + } + + log.Printf("Successfully pushed historic data for %d hours ago (timestamp: %s)", + hoursAgo, timestamp.Format(time.RFC3339)) + return nil +} + +// Backfill ingests historic metrics for a range of time points. +func (i RemoteWriteIngester) Backfill(ctx context.Context, url string, startHoursAgo, endHoursAgo, intervalHours int) error { + log.Printf("Starting backfill from %d hours ago to %d hours ago (interval: %d hours)", + startHoursAgo, endHoursAgo, intervalHours) + + successCount := 0 + errorCount := 0 + + for hoursAgo := startHoursAgo; hoursAgo >= endHoursAgo; hoursAgo -= intervalHours { + if err := i.IngestHistoric(ctx, url, hoursAgo); err != nil { + log.Printf("Error pushing data for %d hours ago: %v", hoursAgo, err) + errorCount++ + } else { + successCount++ + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(backfillDelay): + } + } + + log.Printf("Backfill complete: %d successful, %d errors", successCount, errorCount) + + if errorCount > 0 { + return fmt.Errorf("backfill completed with %d errors", errorCount) + } + + return nil +} + +// sendWriteRequest sends a write request to Prometheus using pooled buffers. +func (i RemoteWriteIngester) sendWriteRequest(ctx context.Context, url string, writeRequest *prompb.WriteRequest) error { + // Get protobuf buffer from pool + protoBufPtr := protoBufferPool.Get().(*[]byte) + protoBuf := (*protoBufPtr)[:0] // Reset length but keep capacity + defer protoBufferPool.Put(protoBufPtr) + + // Marshal into pooled buffer + data, err := writeRequest.Marshal() + if err != nil { + return fmt.Errorf("failed to marshal write request: %w", err) + } + + // Compress using pooled buffer + compressed := snappy.Encode(protoBuf, data) + + // Get request buffer from pool + buf := bufferPool.Get().(*bytes.Buffer) + buf.Reset() + defer bufferPool.Put(buf) + + buf.Write(compressed) + + req, err := http.NewRequestWithContext(ctx, "POST", url, buf) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("Content-Encoding", "snappy") + req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + req.Header.Set("Content-Length", fmt.Sprintf("%d", buf.Len())) + + resp, err := i.client.Do(req) + if err != nil { + return fmt.Errorf("failed to send remote write request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("remote write failed with status %d: %s", resp.StatusCode, string(body)) + } + + return nil +} + +// convertSamplesToTimeSeries converts metrics.Sample to prompb.TimeSeries format with pooling. +func convertSamplesToTimeSeries(samples []metrics.Sample) []prompb.TimeSeries { + // Preallocate with exact capacity to avoid reallocation + timeSeries := make([]prompb.TimeSeries, 0, len(samples)) + + // Reusable label slice + labelsPtr := labelSlicePool.Get().(*[]prompb.Label) + labels := *labelsPtr + defer labelSlicePool.Put(labelsPtr) + + for i := range samples { + sample := &samples[i] // Avoid copying + + // Reset labels slice for reuse + labels = labels[:0] + + // Add __name__ label + labels = append(labels, prompb.Label{Name: "__name__", Value: sample.MetricName}) + + // Add custom labels + for k, v := range sample.Labels { + labels = append(labels, prompb.Label{Name: k, Value: v}) + } + + // Copy labels (must not share slice across time series) + labelsCopy := make([]prompb.Label, len(labels)) + copy(labelsCopy, labels) + + // Create time series (reuse pattern, but we need unique objects) + timeSeries = append(timeSeries, prompb.TimeSeries{ + Labels: labelsCopy, + Samples: []prompb.Sample{{ + Value: sample.Value, + Timestamp: sample.Timestamp.UnixMilli(), + }}, + }) + } + + return timeSeries +} + +// generateHistoricTimeSeries generates example time series for a specific timestamp. +func generateHistoricTimeSeries(timestamp time.Time) []prompb.TimeSeries { + timestampMs := timestamp.UnixMilli() + var timeSeries []prompb.TimeSeries + + baseLabels := []prompb.Label{ + {Name: "instance", Value: "example-app"}, + {Name: "job", Value: "historic_data"}, + } + + timeSeries = append(timeSeries, createCounterSeries("epimetheus_test_requests_total", baseLabels, float64(rand.Intn(100)+1), timestampMs)) + timeSeries = append(timeSeries, createGaugeSeries("epimetheus_test_active_connections", baseLabels, float64(rand.Intn(100)), timestampMs)) + timeSeries = append(timeSeries, createGaugeSeries("epimetheus_test_temperature_celsius", baseLabels, 15+rand.Float64()*20, timestampMs)) + + timeSeries = append(timeSeries, generateHistogramSeries(baseLabels, timestampMs)...) + timeSeries = append(timeSeries, generateLabeledCounterSeries(baseLabels, timestampMs)...) + + return timeSeries +} + +// createCounterSeries creates a counter metric time series. +func createCounterSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries { + labels := []prompb.Label{{Name: "__name__", Value: name}} + labels = append(labels, baseLabels...) + + return prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{{Value: value, Timestamp: timestamp}}, + } +} + +// createGaugeSeries creates a gauge metric time series. +func createGaugeSeries(name string, baseLabels []prompb.Label, value float64, timestamp int64) prompb.TimeSeries { + return createCounterSeries(name, baseLabels, value, timestamp) +} + +// generateHistogramSeries generates histogram bucket time series. +func generateHistogramSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries { + buckets := []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} + var series []prompb.TimeSeries + + cumulativeCount := 0 + for _, bucket := range buckets { + cumulativeCount += rand.Intn(5) + labels := []prompb.Label{ + {Name: "__name__", Value: "epimetheus_test_request_duration_seconds_bucket"}, + {Name: "le", Value: fmt.Sprintf("%g", bucket)}, + } + labels = append(labels, baseLabels...) + + series = append(series, prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}}, + }) + } + + infLabels := []prompb.Label{ + {Name: "__name__", Value: "epimetheus_test_request_duration_seconds_bucket"}, + {Name: "le", Value: "+Inf"}, + } + infLabels = append(infLabels, baseLabels...) + series = append(series, prompb.TimeSeries{ + Labels: infLabels, + Samples: []prompb.Sample{{Value: float64(cumulativeCount), Timestamp: timestamp}}, + }) + + series = append(series, createCounterSeries("epimetheus_test_request_duration_seconds_sum", baseLabels, rand.Float64()*100, timestamp)) + series = append(series, createCounterSeries("epimetheus_test_request_duration_seconds_count", baseLabels, float64(cumulativeCount), timestamp)) + + return series +} + +// generateLabeledCounterSeries generates labeled counter time series. +func generateLabeledCounterSeries(baseLabels []prompb.Label, timestamp int64) []prompb.TimeSeries { + jobTypes := []string{"email", "report", "backup"} + statuses := []string{"success", "failed"} + var series []prompb.TimeSeries + + for _, jobType := range jobTypes { + for _, status := range statuses { + labels := []prompb.Label{ + {Name: "__name__", Value: "epimetheus_test_jobs_processed_total"}, + {Name: "job_type", Value: jobType}, + {Name: "status", Value: status}, + } + labels = append(labels, baseLabels...) + + series = append(series, prompb.TimeSeries{ + Labels: labels, + Samples: []prompb.Sample{{Value: float64(rand.Intn(20)), Timestamp: timestamp}}, + }) + } + } + + return series +} diff --git a/internal/ingester/remotewrite_test.go b/internal/ingester/remotewrite_test.go new file mode 100644 index 0000000..d1fdee6 --- /dev/null +++ b/internal/ingester/remotewrite_test.go @@ -0,0 +1,210 @@ +package ingester + +import ( + "testing" + "time" + + "epimetheus/internal/metrics" + + "github.com/prometheus/prometheus/prompb" +) + +func TestNewRemoteWriteIngester(t *testing.T) { + ingester := NewRemoteWriteIngester() + if ingester.client == nil { + t.Error("RemoteWriteIngester.client should not be nil") + } +} + +func TestConvertSamplesToTimeSeries(t *testing.T) { + now := time.Now() + samples := []metrics.Sample{ + { + MetricName: "test_metric1", + Labels: map[string]string{"env": "prod", "host": "server1"}, + Value: 42.5, + Timestamp: now, + }, + { + MetricName: "test_metric2", + Labels: map[string]string{"env": "test"}, + Value: 100.0, + Timestamp: now.Add(-1 * time.Hour), + }, + } + + timeSeries := convertSamplesToTimeSeries(samples) + + if len(timeSeries) != 2 { + t.Errorf("Expected 2 time series, got %d", len(timeSeries)) + } + + // Check first time series + ts1 := timeSeries[0] + if len(ts1.Labels) != 3 { // __name__ + 2 custom labels + t.Errorf("Expected 3 labels, got %d", len(ts1.Labels)) + } + + hasName := false + for _, label := range ts1.Labels { + if label.Name == "__name__" && label.Value == "test_metric1" { + hasName = true + } + } + if !hasName { + t.Error("Missing or incorrect __name__ label") + } + + if len(ts1.Samples) != 1 { + t.Errorf("Expected 1 sample, got %d", len(ts1.Samples)) + } + if ts1.Samples[0].Value != 42.5 { + t.Errorf("Expected value 42.5, got %f", ts1.Samples[0].Value) + } +} + +func TestGenerateHistoricTimeSeries(t *testing.T) { + timestamp := time.Now().Add(-24 * time.Hour) + + timeSeries := generateHistoricTimeSeries(timestamp) + + if len(timeSeries) == 0 { + t.Error("Expected time series to be generated") + } + + // Should contain various metric types + metricNames := make(map[string]bool) + for _, ts := range timeSeries { + for _, label := range ts.Labels { + if label.Name == "__name__" { + metricNames[label.Value] = true + } + } + } + + expectedMetrics := []string{ + "epimetheus_test_requests_total", + "epimetheus_test_active_connections", + "epimetheus_test_temperature_celsius", + "epimetheus_test_jobs_processed_total", + } + + for _, expected := range expectedMetrics { + if !metricNames[expected] { + t.Errorf("Expected metric %s not found", expected) + } + } +} + +func TestCreateCounterSeries(t *testing.T) { + baseLabels := []prompb.Label{ + {Name: "instance", Value: "test-instance"}, + {Name: "job", Value: "test-job"}, + } + + ts := createCounterSeries("test_counter", baseLabels, 123.45, 1234567890000) + + if len(ts.Labels) != 3 { // __name__ + 2 base labels + t.Errorf("Expected 3 labels, got %d", len(ts.Labels)) + } + + if len(ts.Samples) != 1 { + t.Errorf("Expected 1 sample, got %d", len(ts.Samples)) + } + + if ts.Samples[0].Value != 123.45 { + t.Errorf("Expected value 123.45, got %f", ts.Samples[0].Value) + } + + if ts.Samples[0].Timestamp != 1234567890000 { + t.Errorf("Expected timestamp 1234567890000, got %d", ts.Samples[0].Timestamp) + } +} + +func TestCreateGaugeSeries(t *testing.T) { + baseLabels := []prompb.Label{ + {Name: "instance", Value: "test-instance"}, + } + + ts := createGaugeSeries("test_gauge", baseLabels, 67.89, 9876543210000) + + if len(ts.Samples) != 1 { + t.Errorf("Expected 1 sample, got %d", len(ts.Samples)) + } + + if ts.Samples[0].Value != 67.89 { + t.Errorf("Expected value 67.89, got %f", ts.Samples[0].Value) + } +} + +func TestGenerateHistogramSeries(t *testing.T) { + baseLabels := []prompb.Label{ + {Name: "instance", Value: "test-instance"}, + } + timestamp := int64(1234567890000) + + series := generateHistogramSeries(baseLabels, timestamp) + + if len(series) == 0 { + t.Error("Expected histogram series to be generated") + } + + // Should contain buckets, +Inf, sum, and count + metricTypes := make(map[string]int) + for _, ts := range series { + for _, label := range ts.Labels { + if label.Name == "__name__" { + metricTypes[label.Value]++ + } + } + } + + if metricTypes["epimetheus_test_request_duration_seconds_bucket"] == 0 { + t.Error("Expected histogram buckets") + } + if metricTypes["epimetheus_test_request_duration_seconds_sum"] != 1 { + t.Error("Expected histogram sum") + } + if metricTypes["epimetheus_test_request_duration_seconds_count"] != 1 { + t.Error("Expected histogram count") + } +} + +func TestGenerateLabeledCounterSeries(t *testing.T) { + baseLabels := []prompb.Label{ + {Name: "instance", Value: "test-instance"}, + } + timestamp := int64(1234567890000) + + series := generateLabeledCounterSeries(baseLabels, timestamp) + + if len(series) == 0 { + t.Error("Expected labeled counter series to be generated") + } + + // Should have combinations of job types and statuses + // 3 job types * 2 statuses = 6 series + if len(series) != 6 { + t.Errorf("Expected 6 labeled counter series, got %d", len(series)) + } + + // Verify label structure + for _, ts := range series { + hasJobType := false + hasStatus := false + for _, label := range ts.Labels { + if label.Name == "job_type" { + hasJobType = true + } + if label.Name == "status" { + hasStatus = true + } + } + if !hasJobType { + t.Error("Expected job_type label") + } + if !hasStatus { + t.Error("Expected status label") + } + } +} diff --git a/internal/metrics/generator.go b/internal/metrics/generator.go new file mode 100644 index 0000000..c85906a --- /dev/null +++ b/internal/metrics/generator.go @@ -0,0 +1,85 @@ +package metrics + +import ( + "math/rand" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + minTemperature = 15.0 + maxTemperature = 35.0 + maxConnections = 100 + maxRequests = 10 +) + +var ( + jobTypes = []string{"email", "report", "backup"} + statuses = []string{"success", "failed"} +) + +// Collectors holds Prometheus metric collectors for realtime mode +type Collectors struct { + RequestsTotal prometheus.Counter + ActiveConnections prometheus.Gauge + TemperatureCelsius prometheus.Gauge + RequestDuration prometheus.Histogram + JobsProcessed *prometheus.CounterVec +} + +// NewCollectors creates new Prometheus metric collectors for testing. +// All metrics are prefixed with "epimetheus_test_" to clearly indicate +// they are generated by the prometheus-pusher test/demo functionality. +func NewCollectors() Collectors { + return Collectors{ + RequestsTotal: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "epimetheus_test_requests_total", + Help: "Total number of requests processed (test metric)", + }, + ), + ActiveConnections: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "epimetheus_test_active_connections", + Help: "Number of currently active connections (test metric)", + }, + ), + TemperatureCelsius: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "epimetheus_test_temperature_celsius", + Help: "Current temperature in Celsius (test metric)", + }, + ), + RequestDuration: prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "epimetheus_test_request_duration_seconds", + Help: "Histogram of request duration in seconds (test metric)", + Buckets: prometheus.DefBuckets, + }, + ), + JobsProcessed: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "epimetheus_test_jobs_processed_total", + Help: "Total number of jobs processed by type (test metric)", + }, + []string{"job_type", "status"}, + ), + } +} + +// Simulate generates random metric values for the collectors +func (c Collectors) Simulate() { + c.RequestsTotal.Add(float64(rand.Intn(maxRequests) + 1)) + c.ActiveConnections.Set(float64(rand.Intn(maxConnections))) + c.TemperatureCelsius.Set(minTemperature + rand.Float64()*(maxTemperature-minTemperature)) + + for i := 0; i < rand.Intn(5)+1; i++ { + duration := rand.Float64() * 2 + c.RequestDuration.Observe(duration) + } + + for _, jobType := range jobTypes { + status := statuses[rand.Intn(len(statuses))] + c.JobsProcessed.WithLabelValues(jobType, status).Add(float64(rand.Intn(5))) + } +} diff --git a/internal/metrics/generator_test.go b/internal/metrics/generator_test.go new file mode 100644 index 0000000..69395eb --- /dev/null +++ b/internal/metrics/generator_test.go @@ -0,0 +1,53 @@ +package metrics + +import ( + "testing" +) + +func TestNewCollectors(t *testing.T) { + collectors := NewCollectors() + + if collectors.RequestsTotal == nil { + t.Error("RequestsTotal should not be nil") + } + if collectors.ActiveConnections == nil { + t.Error("ActiveConnections should not be nil") + } + if collectors.TemperatureCelsius == nil { + t.Error("TemperatureCelsius should not be nil") + } + if collectors.RequestDuration == nil { + t.Error("RequestDuration should not be nil") + } + if collectors.JobsProcessed == nil { + t.Error("JobsProcessed should not be nil") + } +} + +func TestCollectors_Simulate(t *testing.T) { + collectors := NewCollectors() + + // Should not panic + collectors.Simulate() + + // Run multiple times to test randomness + for i := 0; i < 10; i++ { + collectors.Simulate() + } +} + +func TestCollectors_SimulateMetrics(t *testing.T) { + collectors := NewCollectors() + + // Test that metrics get values after simulation + collectors.Simulate() + + // We can't easily inspect the values without the prometheus client, + // but we can verify the collectors were created properly + if collectors.RequestsTotal == nil { + t.Error("RequestsTotal not initialized") + } + if collectors.JobsProcessed == nil { + t.Error("JobsProcessed not initialized") + } +} diff --git a/internal/metrics/sample.go b/internal/metrics/sample.go new file mode 100644 index 0000000..04360f5 --- /dev/null +++ b/internal/metrics/sample.go @@ -0,0 +1,34 @@ +package metrics + +import "time" + +// Sample represents a single metric sample with timestamp +type Sample struct { + MetricName string + Labels map[string]string + Value float64 + Timestamp time.Time +} + +// NewSample creates a new Sample +func NewSample(name string, labels map[string]string, value float64, timestamp time.Time) Sample { + if labels == nil { + labels = make(map[string]string) + } + return Sample{ + MetricName: name, + Labels: labels, + Value: value, + Timestamp: timestamp, + } +} + +// Age returns how old the sample is +func (s Sample) Age() time.Duration { + return time.Since(s.Timestamp) +} + +// IsRecent returns true if the sample is recent enough for realtime ingestion +func (s Sample) IsRecent(threshold time.Duration) bool { + return s.Age() < threshold +} diff --git a/internal/metrics/sample_test.go b/internal/metrics/sample_test.go new file mode 100644 index 0000000..2ffd78b --- /dev/null +++ b/internal/metrics/sample_test.go @@ -0,0 +1,160 @@ +package metrics + +import ( + "testing" + "time" +) + +func TestNewSample(t *testing.T) { + tests := []struct { + name string + metric string + labels map[string]string + value float64 + timestamp time.Time + wantNil bool + }{ + { + name: "with labels", + metric: "test_metric", + labels: map[string]string{"env": "prod", "host": "server1"}, + value: 42.5, + timestamp: time.Now(), + wantNil: false, + }, + { + name: "nil labels initialized", + metric: "test_metric", + labels: nil, + value: 100, + timestamp: time.Now(), + wantNil: false, + }, + { + name: "empty labels", + metric: "test_metric", + labels: map[string]string{}, + value: 0, + timestamp: time.Now(), + wantNil: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sample := NewSample(tt.metric, tt.labels, tt.value, tt.timestamp) + + if sample.MetricName != tt.metric { + t.Errorf("MetricName = %v, want %v", sample.MetricName, tt.metric) + } + if sample.Value != tt.value { + t.Errorf("Value = %v, want %v", sample.Value, tt.value) + } + if sample.Labels == nil { + t.Error("Labels should never be nil") + } + if !sample.Timestamp.Equal(tt.timestamp) { + t.Errorf("Timestamp = %v, want %v", sample.Timestamp, tt.timestamp) + } + }) + } +} + +func TestSample_Age(t *testing.T) { + tests := []struct { + name string + sample Sample + wantNear time.Duration + }{ + { + name: "recent sample", + sample: Sample{ + MetricName: "test", + Timestamp: time.Now().Add(-5 * time.Minute), + }, + wantNear: 5 * time.Minute, + }, + { + name: "old sample", + sample: Sample{ + MetricName: "test", + Timestamp: time.Now().Add(-1 * time.Hour), + }, + wantNear: 1 * time.Hour, + }, + { + name: "very recent", + sample: Sample{ + MetricName: "test", + Timestamp: time.Now().Add(-10 * time.Second), + }, + wantNear: 10 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + age := tt.sample.Age() + // Allow 1 second tolerance for test execution time + if age < tt.wantNear-time.Second || age > tt.wantNear+time.Second { + t.Errorf("Age() = %v, want near %v", age, tt.wantNear) + } + }) + } +} + +func TestSample_IsRecent(t *testing.T) { + threshold := 5 * time.Minute + + tests := []struct { + name string + sample Sample + threshold time.Duration + want bool + }{ + { + name: "within threshold", + sample: Sample{ + MetricName: "test", + Timestamp: time.Now().Add(-2 * time.Minute), + }, + threshold: threshold, + want: true, + }, + { + name: "beyond threshold", + sample: Sample{ + MetricName: "test", + Timestamp: time.Now().Add(-10 * time.Minute), + }, + threshold: threshold, + want: false, + }, + { + name: "exactly at threshold", + sample: Sample{ + MetricName: "test", + Timestamp: time.Now().Add(-5 * time.Minute), + }, + threshold: threshold, + want: false, + }, + { + name: "very recent", + sample: Sample{ + MetricName: "test", + Timestamp: time.Now().Add(-10 * time.Second), + }, + threshold: threshold, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.sample.IsRecent(tt.threshold); got != tt.want { + t.Errorf("IsRecent() = %v, want %v (age: %v)", got, tt.want, tt.sample.Age()) + } + }) + } +} diff --git a/internal/parser/csv.go b/internal/parser/csv.go new file mode 100644 index 0000000..64d16e5 --- /dev/null +++ b/internal/parser/csv.go @@ -0,0 +1,101 @@ +package parser + +import ( + "context" + "encoding/csv" + "fmt" + "io" + "strconv" + "strings" + "time" + + "epimetheus/internal/metrics" +) + +// CSVParser parses metrics from CSV format +type CSVParser struct{} + +// NewCSVParser creates a new CSV parser +func NewCSVParser() *CSVParser { + return &CSVParser{} +} + +// Parse reads metrics from CSV format +// Format: metric_name,label1=value1;label2=value2,value,timestamp_ms +func (p *CSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) { + var samples []metrics.Sample + + csvReader := csv.NewReader(reader) + csvReader.Comment = '#' + + lineNum := 0 + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + record, err := csvReader.Read() + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("line %d: %w", lineNum, err) + } + lineNum++ + + if len(record) < 3 { + continue // Skip invalid records + } + + sample, err := p.parseRecord(record, lineNum) + if err != nil { + continue // Skip records with errors + } + + samples = append(samples, sample) + } + + return samples, nil +} + +func (p *CSVParser) parseRecord(record []string, lineNum int) (metrics.Sample, error) { + metricName := strings.TrimSpace(record[0]) + if metricName == "" { + return metrics.Sample{}, fmt.Errorf("empty metric name") + } + + labels := parseLabels(record[1]) + + value, err := strconv.ParseFloat(strings.TrimSpace(record[2]), 64) + if err != nil { + return metrics.Sample{}, fmt.Errorf("invalid value: %w", err) + } + + timestamp := time.Now() + if len(record) > 3 && record[3] != "" { + timestampMs, err := strconv.ParseInt(strings.TrimSpace(record[3]), 10, 64) + if err == nil { + timestamp = time.UnixMilli(timestampMs) + } + } + + return metrics.NewSample(metricName, labels, value, timestamp), nil +} + +func parseLabels(labelStr string) map[string]string { + labels := make(map[string]string) + if labelStr == "" { + return labels + } + + labelPairs := strings.Split(labelStr, ";") + for _, pair := range labelPairs { + parts := strings.SplitN(pair, "=", 2) + if len(parts) == 2 { + labels[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1]) + } + } + return labels +} diff --git a/internal/parser/csv_test.go b/internal/parser/csv_test.go new file mode 100644 index 0000000..ffe9034 --- /dev/null +++ b/internal/parser/csv_test.go @@ -0,0 +1,175 @@ +package parser + +import ( + "context" + "strings" + "testing" + "time" +) + +func TestCSVParser_Parse(t *testing.T) { + tests := []struct { + name string + input string + wantCount int + wantErr bool + }{ + { + name: "valid single line", + input: `test_metric,env=prod;host=server1,42.5,1234567890000`, + wantCount: 1, + wantErr: false, + }, + { + name: "multiple lines", + input: `metric1,label1=value1,100,1234567890000 +metric2,label2=value2,200,1234567891000 +metric3,label3=value3,300,1234567892000`, + wantCount: 3, + wantErr: false, + }, + { + name: "with comments", + input: `# This is a comment +metric1,env=test,50,1234567890000 +# Another comment +metric2,env=prod,75,1234567891000`, + wantCount: 2, + wantErr: false, + }, + { + name: "no timestamp defaults to now", + input: `metric1,env=test,100`, + wantCount: 1, + wantErr: false, + }, + { + name: "no labels", + input: `metric1,,100,1234567890000`, + wantCount: 1, + wantErr: false, + }, + { + name: "empty input", + input: "", + wantCount: 0, + wantErr: false, + }, + { + name: "invalid line causes error", + input: `metric1,env=test,100,1234567890000 +invalid +metric2,env=prod,200,1234567891000`, + wantCount: 0, + wantErr: true, + }, + { + name: "invalid value skipped", + input: `metric1,env=test,not_a_number,1234567890000 +metric2,env=prod,200,1234567891000`, + wantCount: 1, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := NewCSVParser() + reader := strings.NewReader(tt.input) + ctx := context.Background() + + samples, err := parser.Parse(ctx, reader) + + if (err != nil) != tt.wantErr { + t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr) + return + } + if len(samples) != tt.wantCount { + t.Errorf("Parse() returned %d samples, want %d", len(samples), tt.wantCount) + } + }) + } +} + +func TestCSVParser_ParseLabels(t *testing.T) { + tests := []struct { + name string + input string + want map[string]string + }{ + { + name: "single label", + input: "env=prod", + want: map[string]string{"env": "prod"}, + }, + { + name: "multiple labels", + input: "env=prod;host=server1;region=us-west", + want: map[string]string{"env": "prod", "host": "server1", "region": "us-west"}, + }, + { + name: "empty string", + input: "", + want: map[string]string{}, + }, + { + name: "invalid label format skipped", + input: "env=prod;invalid;host=server1", + want: map[string]string{"env": "prod", "host": "server1"}, + }, + { + name: "with spaces", + input: " env = prod ; host = server1 ", + want: map[string]string{"env": "prod", "host": "server1"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := parseLabels(tt.input) + if len(got) != len(tt.want) { + t.Errorf("parseLabels() returned %d labels, want %d", len(got), len(tt.want)) + } + for k, v := range tt.want { + if got[k] != v { + t.Errorf("parseLabels()[%s] = %v, want %v", k, got[k], v) + } + } + }) + } +} + +func TestCSVParser_ParseWithContext(t *testing.T) { + t.Run("context cancellation", func(t *testing.T) { + parser := NewCSVParser() + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + input := strings.NewReader(`metric1,env=test,100,1234567890000`) + _, err := parser.Parse(ctx, input) + + if err != context.Canceled { + t.Errorf("Expected context.Canceled error, got %v", err) + } + }) +} + +func TestCSVParser_ParseTimestamp(t *testing.T) { + parser := NewCSVParser() + input := `metric1,env=test,100,1234567890000` + reader := strings.NewReader(input) + ctx := context.Background() + + samples, err := parser.Parse(ctx, reader) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + expectedTime := time.UnixMilli(1234567890000) + if !samples[0].Timestamp.Equal(expectedTime) { + t.Errorf("Timestamp = %v, want %v", samples[0].Timestamp, expectedTime) + } +} diff --git a/internal/parser/json.go b/internal/parser/json.go new file mode 100644 index 0000000..3b8c2e8 --- /dev/null +++ b/internal/parser/json.go @@ -0,0 +1,62 @@ +package parser + +import ( + "context" + "encoding/json" + "fmt" + "io" + "time" + + "epimetheus/internal/metrics" +) + +// JSONParser parses metrics from JSON format +type JSONParser struct{} + +// NewJSONParser creates a new JSON parser +func NewJSONParser() *JSONParser { + return &JSONParser{} +} + +type jsonSample struct { + Metric string `json:"metric"` + Labels map[string]string `json:"labels"` + Value float64 `json:"value"` + TimestampMs int64 `json:"timestamp_ms,omitempty"` +} + +// Parse reads metrics from JSON format +func (p *JSONParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) { + var rawSamples []jsonSample + + decoder := json.NewDecoder(reader) + if err := decoder.Decode(&rawSamples); err != nil { + return nil, fmt.Errorf("failed to parse JSON: %w", err) + } + + samples := make([]metrics.Sample, 0, len(rawSamples)) + for _, raw := range rawSamples { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + if raw.Metric == "" { + continue + } + + timestamp := time.Now() + if raw.TimestampMs > 0 { + timestamp = time.UnixMilli(raw.TimestampMs) + } + + if raw.Labels == nil { + raw.Labels = make(map[string]string) + } + + samples = append(samples, metrics.NewSample(raw.Metric, raw.Labels, raw.Value, timestamp)) + } + + return samples, nil +} diff --git a/internal/parser/json_test.go b/internal/parser/json_test.go new file mode 100644 index 0000000..d521942 --- /dev/null +++ b/internal/parser/json_test.go @@ -0,0 +1,177 @@ +package parser + +import ( + "context" + "strings" + "testing" + "time" +) + +func TestJSONParser_Parse(t *testing.T) { + tests := []struct { + name string + input string + wantCount int + wantErr bool + }{ + { + name: "valid single sample", + input: `[ + { + "metric": "test_metric", + "labels": {"env": "prod", "host": "server1"}, + "value": 42.5, + "timestamp_ms": 1234567890000 + } + ]`, + wantCount: 1, + wantErr: false, + }, + { + name: "multiple samples", + input: `[ + {"metric": "metric1", "labels": {"env": "prod"}, "value": 100, "timestamp_ms": 1234567890000}, + {"metric": "metric2", "labels": {"env": "test"}, "value": 200, "timestamp_ms": 1234567891000}, + {"metric": "metric3", "labels": {"env": "dev"}, "value": 300, "timestamp_ms": 1234567892000} + ]`, + wantCount: 3, + wantErr: false, + }, + { + name: "no timestamp defaults to now", + input: `[ + {"metric": "test_metric", "labels": {"env": "prod"}, "value": 100} + ]`, + wantCount: 1, + wantErr: false, + }, + { + name: "no labels", + input: `[ + {"metric": "test_metric", "value": 100, "timestamp_ms": 1234567890000} + ]`, + wantCount: 1, + wantErr: false, + }, + { + name: "empty metric skipped", + input: `[ + {"metric": "", "labels": {"env": "prod"}, "value": 100}, + {"metric": "valid_metric", "labels": {"env": "test"}, "value": 200} + ]`, + wantCount: 1, + wantErr: false, + }, + { + name: "empty array", + input: `[]`, + wantCount: 0, + wantErr: false, + }, + { + name: "invalid json", + input: `{not valid json}`, + wantCount: 0, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := NewJSONParser() + reader := strings.NewReader(tt.input) + ctx := context.Background() + + samples, err := parser.Parse(ctx, reader) + + if (err != nil) != tt.wantErr { + t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr) + return + } + if len(samples) != tt.wantCount { + t.Errorf("Parse() returned %d samples, want %d", len(samples), tt.wantCount) + } + }) + } +} + +func TestJSONParser_ParseWithContext(t *testing.T) { + t.Run("context check during parse", func(t *testing.T) { + parser := NewJSONParser() + ctx, cancel := context.WithCancel(context.Background()) + + // Create valid input with empty metrics that will be filtered + input := `[ + {"metric": "", "value": 1}, + {"metric": "", "value": 2}, + {"metric": "", "value": 3} + ]` + + cancel() // Cancel before parsing + + reader := strings.NewReader(input) + _, err := parser.Parse(ctx, reader) + + // Context cancellation should be detected during sample processing + if err != context.Canceled { + // Note: JSON decoder may finish before context is checked + // This test verifies context support exists, but timing is not guaranteed + t.Logf("Got error: %v (context may not be checked until after JSON decode)", err) + } + }) +} + +func TestJSONParser_ParseTimestamp(t *testing.T) { + parser := NewJSONParser() + input := `[{"metric": "test_metric", "value": 100, "timestamp_ms": 1234567890000}]` + reader := strings.NewReader(input) + ctx := context.Background() + + samples, err := parser.Parse(ctx, reader) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + expectedTime := time.UnixMilli(1234567890000) + if !samples[0].Timestamp.Equal(expectedTime) { + t.Errorf("Timestamp = %v, want %v", samples[0].Timestamp, expectedTime) + } +} + +func TestJSONParser_ParseLabels(t *testing.T) { + parser := NewJSONParser() + input := `[{ + "metric": "test_metric", + "labels": {"env": "prod", "host": "server1", "region": "us-west"}, + "value": 100 + }]` + reader := strings.NewReader(input) + ctx := context.Background() + + samples, err := parser.Parse(ctx, reader) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + expectedLabels := map[string]string{ + "env": "prod", + "host": "server1", + "region": "us-west", + } + + if len(samples[0].Labels) != len(expectedLabels) { + t.Errorf("Got %d labels, want %d", len(samples[0].Labels), len(expectedLabels)) + } + + for k, v := range expectedLabels { + if samples[0].Labels[k] != v { + t.Errorf("Label[%s] = %v, want %v", k, samples[0].Labels[k], v) + } + } +} diff --git a/internal/parser/parser.go b/internal/parser/parser.go new file mode 100644 index 0000000..860baa3 --- /dev/null +++ b/internal/parser/parser.go @@ -0,0 +1,56 @@ +package parser + +import ( + "context" + "fmt" + "io" + "os" + + "epimetheus/internal/metrics" +) + +// Parser defines the interface for metric parsers. +type Parser interface { + Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) +} + +// ParseFile parses metrics from a file. +func ParseFile(ctx context.Context, filename, format string) ([]metrics.Sample, error) { + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + return parseWithFormat(ctx, file, format) +} + +// ParseStdin parses metrics from standard input. +func ParseStdin(ctx context.Context, format string) ([]metrics.Sample, error) { + return parseWithFormat(ctx, os.Stdin, format) +} + +// parseWithFormat parses metrics using the specified format. +func parseWithFormat(ctx context.Context, reader io.Reader, format string) ([]metrics.Sample, error) { + var parser Parser + + switch format { + case "csv": + parser = NewCSVParser() + case "json": + parser = NewJSONParser() + default: + return nil, fmt.Errorf("unsupported format: %s (use csv or json)", format) + } + + samples, err := parser.Parse(ctx, reader) + if err != nil { + return nil, fmt.Errorf("failed to parse metrics: %w", err) + } + + if len(samples) == 0 { + return nil, fmt.Errorf("no valid samples found") + } + + return samples, nil +} diff --git a/internal/parser/parser_test.go b/internal/parser/parser_test.go new file mode 100644 index 0000000..05255a5 --- /dev/null +++ b/internal/parser/parser_test.go @@ -0,0 +1,99 @@ +package parser + +import ( + "context" + "strings" + "testing" +) + +func TestParseFile_CSV(t *testing.T) { + // We can't easily test file operations without creating temp files + // So we'll test the error case + ctx := context.Background() + _, err := ParseFile(ctx, "/nonexistent/file.csv", "csv") + + if err == nil { + t.Error("Expected error for nonexistent file") + } +} + +func TestParseWithFormat_CSV(t *testing.T) { + ctx := context.Background() + input := `test_metric,env=prod,100,1234567890000` + reader := strings.NewReader(input) + + samples, err := parseWithFormat(ctx, reader, "csv") + if err != nil { + t.Fatalf("parseWithFormat(csv) error = %v", err) + } + if len(samples) != 1 { + t.Errorf("Expected 1 sample, got %d", len(samples)) + } +} + +func TestParseWithFormat_JSON(t *testing.T) { + ctx := context.Background() + input := `[{"metric": "test_metric", "value": 100, "timestamp_ms": 1234567890000}]` + reader := strings.NewReader(input) + + samples, err := parseWithFormat(ctx, reader, "json") + if err != nil { + t.Fatalf("parseWithFormat(json) error = %v", err) + } + if len(samples) != 1 { + t.Errorf("Expected 1 sample, got %d", len(samples)) + } +} + +func TestParseWithFormat_UnsupportedFormat(t *testing.T) { + ctx := context.Background() + reader := strings.NewReader("") + + _, err := parseWithFormat(ctx, reader, "xml") + if err == nil { + t.Error("Expected error for unsupported format") + } + if err.Error() != "unsupported format: xml (use csv or json)" { + t.Errorf("Unexpected error message: %v", err) + } +} + +func TestParseWithFormat_EmptyResult(t *testing.T) { + ctx := context.Background() + input := `[]` // Empty JSON array + reader := strings.NewReader(input) + + _, err := parseWithFormat(ctx, reader, "json") + if err == nil { + t.Error("Expected error for empty samples") + } + if err.Error() != "no valid samples found" { + t.Errorf("Expected 'no valid samples found' error, got: %v", err) + } +} + +func TestParseStdin_Format(t *testing.T) { + // We can't easily test stdin without mocking, + // but we can verify the error path + ctx := context.Background() + + // Test with invalid format + _, err := parseWithFormat(ctx, strings.NewReader(""), "invalid_format") + if err == nil { + t.Error("Expected error for invalid format") + } +} + +func TestNewCSVParser(t *testing.T) { + parser := NewCSVParser() + if parser == nil { + t.Error("NewCSVParser() returned nil") + } +} + +func TestNewJSONParser(t *testing.T) { + parser := NewJSONParser() + if parser == nil { + t.Error("NewJSONParser() returned nil") + } +} diff --git a/internal/parser/tabular_csv.go b/internal/parser/tabular_csv.go new file mode 100644 index 0000000..bfa4fbe --- /dev/null +++ b/internal/parser/tabular_csv.go @@ -0,0 +1,256 @@ +package parser + +import ( + "context" + "encoding/csv" + "fmt" + "io" + "log" + "regexp" + "strconv" + "strings" + "time" + + "epimetheus/internal/metrics" + "epimetheus/internal/resolver" +) + +// TabularCSVParser parses tabular CSV files where the first row is headers +// and all subsequent rows are data. Numeric columns become metrics, string columns become labels. +type TabularCSVParser struct { + metricName string + timestamp time.Time + resolver *resolver.DNSResolver // DNS resolver for IP addresses + resolveLabels map[string]bool // Set of label names to resolve via DNS +} + +// SaveDNSCache saves the DNS cache to disk (called after successful ingestion) +func (p *TabularCSVParser) SaveDNSCache() { + if p.resolver != nil { + cacheSize := p.resolver.GetCacheSize() + if err := p.resolver.SaveCache(); err != nil { + // Log error but don't fail - cache saving is not critical + log.Printf("⚠️ Failed to save DNS cache: %v", err) + } else if cacheSize > 0 { + log.Printf("💾 Saved DNS cache with %d entries", cacheSize) + } + } +} + +// numericLabelColumns is a list of column names that should be treated as labels +// even if they contain numeric values. This is useful for categorical codes like +// HTTP status codes (200, 404, 500) or date periods (20160101, 20160102) that should be labels, not metrics. +// +// Column names are matched case-insensitively. +// +// Common examples: +// - responsecode: HTTP response codes (200, 404, 500) +// - statuscode: Application status codes +// - status: Status identifiers +// - code: Generic codes +// - period: Date periods (20160101, 20160102, YYYYMMDD format) +var numericLabelColumns = map[string]bool{ + "responsecode": true, + "statuscode": true, + "status_code": true, + "response_code": true, + "status": true, + "code": true, + "period": true, +} + +// NewTabularCSVParser creates a new tabular CSV parser +// metricName is the name of the metric to create +// timestamp is the timestamp to use for all samples +// resolveIPLabels is a list of label names containing IP addresses to resolve via DNS +func NewTabularCSVParser(metricName string, timestamp time.Time, resolveIPLabels []string) *TabularCSVParser { + // Build set for O(1) lookup + resolveMap := make(map[string]bool) + for _, label := range resolveIPLabels { + resolveMap[label] = true + } + + return &TabularCSVParser{ + metricName: metricName, + timestamp: timestamp, + resolver: resolver.NewDNSResolver(), + resolveLabels: resolveMap, + } +} + +// Parse reads metrics from a tabular CSV format +// First row: column headers +// Subsequent rows: data values +// All columns become labels (numeric values are converted to strings) +func (p *TabularCSVParser) Parse(ctx context.Context, reader io.Reader) ([]metrics.Sample, error) { + var samples []metrics.Sample + + csvReader := csv.NewReader(reader) + csvReader.Comment = '#' + csvReader.TrimLeadingSpace = true + csvReader.FieldsPerRecord = -1 // Allow variable number of fields + + // Read header row + headers, err := csvReader.Read() + if err != nil { + return nil, fmt.Errorf("failed to read header row: %w", err) + } + + if len(headers) == 0 { + return nil, fmt.Errorf("empty header row") + } + + // Trim whitespace and sanitize headers for Prometheus label compatibility + for i := range headers { + headers[i] = sanitizeLabelName(strings.TrimSpace(headers[i])) + } + + // Read data rows + rowNum := 1 + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + record, err := csvReader.Read() + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("line %d: %w", rowNum+1, err) + } + rowNum++ + + if len(record) != len(headers) { + // Skip rows with mismatched column counts + continue + } + + // Add microsecond offset to ensure unique timestamps per row + // This prevents duplicate timestamp errors when multiple rows + // have the same label combinations + rowTimestamp := p.timestamp.Add(time.Duration(rowNum) * time.Microsecond) + + rowSamples := p.parseRecord(headers, record, rowTimestamp) + samples = append(samples, rowSamples...) + } + + return samples, nil +} + +// parseRecord converts a single CSV row into multiple Samples +// Numeric columns become separate metrics, string columns become labels. +// +// Exception: Column names in numericLabelColumns are always treated as labels, +// even if they contain numbers (e.g., responsecode: 200, 404, 500). +// +// DNS Resolution: Label values matching IP addresses in resolveLabels will be +// resolved to hostnames via reverse DNS lookup. +func (p *TabularCSVParser) parseRecord(headers []string, record []string, timestamp time.Time) []metrics.Sample { + var samples []metrics.Sample + + // First pass: identify numeric vs string columns and collect label values + labels := make(map[string]string) + var numericCols []struct { + name string + value float64 + } + + for i, header := range headers { + if i >= len(record) { + continue + } + + value := strings.TrimSpace(record[i]) + + // Check if this column is in the exception list (should be label even if numeric) + headerLower := strings.ToLower(header) + if numericLabelColumns[headerLower] { + // This column should always be a label, even if it's numeric + labels[header] = value + continue + } + + // Try to parse as number + if floatVal, err := strconv.ParseFloat(value, 64); err == nil { + // It's a numeric column - will become a metric + numericCols = append(numericCols, struct { + name string + value float64 + }{name: header, value: floatVal}) + } else { + // It's a string column - becomes a label + labels[header] = value + } + } + + // Resolve IP addresses to hostnames + for labelName := range p.resolveLabels { + if ipValue, exists := labels[labelName]; exists { + // Attempt DNS resolution + if hostname, ok := p.resolver.ResolveIP(ipValue); ok && hostname != "" { + // Replace IP with hostname + labels[labelName] = hostname + } + // If resolution failed, ipValue remains unchanged + } + } + + // Create one sample per numeric column + for _, numCol := range numericCols { + metricName := p.metricName + "_" + numCol.name + // Copy labels for each metric + metricLabels := make(map[string]string, len(labels)) + for k, v := range labels { + metricLabels[k] = v + } + samples = append(samples, metrics.NewSample(metricName, metricLabels, numCol.value, timestamp)) + } + + return samples +} + +// sanitizeLabelName converts a string to a valid Prometheus label name +// Prometheus labels must match [a-zA-Z_][a-zA-Z0-9_]* +// Examples: +// "avg(totaltime)" -> "avg_totaltime" +// "sum(rcv)" -> "sum_rcv" +// "response-code" -> "response_code" +// "123invalid" -> "label_123invalid" +func sanitizeLabelName(name string) string { + if name == "" { + return "unknown" + } + + // Replace invalid characters with underscore + re := regexp.MustCompile(`[^a-zA-Z0-9_]`) + sanitized := re.ReplaceAllString(name, "_") + + // Ensure it starts with a letter or underscore + if len(sanitized) > 0 && sanitized[0] >= '0' && sanitized[0] <= '9' { + sanitized = "label_" + sanitized + } + + // Remove consecutive underscores + sanitized = regexp.MustCompile(`_+`).ReplaceAllString(sanitized, "_") + + // Remove leading/trailing underscores + sanitized = strings.Trim(sanitized, "_") + + if sanitized == "" { + return "unknown" + } + + return sanitized +} + +// ParseFloat64 tries to parse a string as float64, returns 0.0 if fails +func parseFloat64(s string) float64 { + f, err := strconv.ParseFloat(strings.TrimSpace(s), 64) + if err != nil { + return 0.0 + } + return f +} diff --git a/internal/parser/tabular_csv_test.go b/internal/parser/tabular_csv_test.go new file mode 100644 index 0000000..07818e8 --- /dev/null +++ b/internal/parser/tabular_csv_test.go @@ -0,0 +1,469 @@ +package parser + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "epimetheus/internal/metrics" +) + +func TestTabularCSVParser_Parse(t *testing.T) { + tests := []struct { + name string + input string + metricName string + expectedCount int + wantErr bool + }{ + { + name: "simple tabular CSV with numeric and text columns", + input: `responsecode,httpmethod,user,totaltime +200,GET,alice,50.5 +404,POST,bob,100.2 +500,GET,charlie,75.0`, + metricName: "test_metric", + expectedCount: 6, // 3 rows * 2 numeric columns (responsecode, totaltime) + wantErr: false, + }, + { + name: "CSV with mixed data types", + input: `col1,col2,col3 +1,text,3.14 +2,more,2.71 +3,data,1.41`, + metricName: "mixed_metric", + expectedCount: 6, // 3 rows * 2 numeric columns (col1, col3) + wantErr: false, + }, + { + name: "CSV with whitespace", + input: ` col1 , col2 , col3 + 1 , value2 , 3 + 4 , value5 , 6 `, + metricName: "whitespace_metric", + expectedCount: 4, // 2 rows * 2 numeric columns (col1, col3) + wantErr: false, + }, + { + name: "CSV with comments", + input: `# This is a comment +col1,col2,col3 +# Another comment +1,value2,3`, + metricName: "comment_metric", + expectedCount: 2, // 1 row * 2 numeric columns (col1, col3) + wantErr: false, + }, + { + name: "empty CSV", + input: "", + metricName: "empty_metric", + expectedCount: 0, + wantErr: true, // No header row + }, + { + name: "header only", + input: `col1,col2,col3 +`, + metricName: "header_only", + expectedCount: 0, + wantErr: false, + }, + { + name: "mismatched columns - skipped", + input: `col1,col2,col3 +1,value2,3 +value4,value5 +6,value7,8`, + metricName: "mismatched_metric", + expectedCount: 4, // 2 matching rows * 2 numeric columns + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + parser := NewTabularCSVParser(tt.metricName, timestamp, []string{}) // No DNS resolution + + reader := strings.NewReader(tt.input) + samples, err := parser.Parse(ctx, reader) + + if (err != nil) != tt.wantErr { + t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if err == nil && len(samples) != tt.expectedCount { + t.Errorf("Parse() got %d samples, want %d", len(samples), tt.expectedCount) + } + + // Verify all samples have the correct base metric name and timestamp + for _, sample := range samples { + if !strings.HasPrefix(sample.MetricName, tt.metricName+"_") { + t.Errorf("Sample metric name = %s, want prefix %s_", sample.MetricName, tt.metricName) + } + if !sample.Timestamp.Equal(timestamp) { + t.Errorf("Sample timestamp = %v, want %v", sample.Timestamp, timestamp) + } + } + }) + } +} + +func TestTabularCSVParser_Labels(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + input := `responsecode,httpmethod,user,totaltime +200,GET,alice,50.5 +404,POST,bob,100.2` + + parser := NewTabularCSVParser("test_metric", timestamp, []string{}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + // 2 rows * 2 numeric columns (responsecode, totaltime) = 4 samples + if len(samples) != 4 { + t.Fatalf("Expected 4 samples, got %d", len(samples)) + } + + // Find the responsecode and totaltime metrics for first row + var responsecodeMetric, totaltimeMetric *metrics.Sample + for i := range samples { + if samples[i].Labels["httpmethod"] == "GET" && samples[i].Labels["user"] == "alice" { + if strings.HasSuffix(samples[i].MetricName, "_responsecode") { + responsecodeMetric = &samples[i] + } else if strings.HasSuffix(samples[i].MetricName, "_totaltime") { + totaltimeMetric = &samples[i] + } + } + } + + if responsecodeMetric == nil || totaltimeMetric == nil { + t.Fatalf("Could not find expected metrics") + } + + // Check responsecode metric + if responsecodeMetric.Value != 200 { + t.Errorf("responsecode value = %f, want 200", responsecodeMetric.Value) + } + expectedLabels := map[string]string{ + "httpmethod": "GET", + "user": "alice", + } + for key, expectedValue := range expectedLabels { + if actualValue, ok := responsecodeMetric.Labels[key]; !ok { + t.Errorf("responsecode metric missing label %s", key) + } else if actualValue != expectedValue { + t.Errorf("responsecode metric label %s = %s, want %s", key, actualValue, expectedValue) + } + } + + // Check totaltime metric + if totaltimeMetric.Value != 50.5 { + t.Errorf("totaltime value = %f, want 50.5", totaltimeMetric.Value) + } + for key, expectedValue := range expectedLabels { + if actualValue, ok := totaltimeMetric.Labels[key]; !ok { + t.Errorf("totaltime metric missing label %s", key) + } else if actualValue != expectedValue { + t.Errorf("totaltime metric label %s = %s, want %s", key, actualValue, expectedValue) + } + } +} + +func TestTabularCSVParser_ContextCancellation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + timestamp := time.Now() + input := `col1,col2,col3 +value1,value2,value3 +value4,value5,value6` + + parser := NewTabularCSVParser("test_metric", timestamp, []string{}) + reader := strings.NewReader(input) + _, err := parser.Parse(ctx, reader) + + if err != context.Canceled { + t.Errorf("Expected context.Canceled error, got %v", err) + } +} + +func TestTabularCSVParser_LargeFile(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + + // Generate a CSV with 1000 rows, 1 numeric column + var builder strings.Builder + builder.WriteString("col1,col2,col3\n") + for i := 0; i < 1000; i++ { + builder.WriteString(fmt.Sprintf("%d,value2,value3\n", i)) + } + + parser := NewTabularCSVParser("large_metric", timestamp, []string{}) + reader := strings.NewReader(builder.String()) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + // 1000 rows * 1 numeric column = 1000 samples + if len(samples) != 1000 { + t.Errorf("Expected 1000 samples, got %d", len(samples)) + } +} + +func TestSanitizeLabelName(t *testing.T) { + tests := []struct { + input string + expected string + }{ + {"avg(totaltime)", "avg_totaltime"}, + {"sum(rcv)", "sum_rcv"}, + {"response-code", "response_code"}, + {"valid_label", "valid_label"}, + {"ValidLabel123", "ValidLabel123"}, + {"123invalid", "label_123invalid"}, + {"label__with___underscores", "label_with_underscores"}, + {"_leading", "leading"}, + {"trailing_", "trailing"}, + {"special!@#$chars", "special_chars"}, + {"", "unknown"}, + {"___", "unknown"}, + {"http.method", "http_method"}, + {"status_code", "status_code"}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + result := sanitizeLabelName(tt.input) + if result != tt.expected { + t.Errorf("sanitizeLabelName(%q) = %q, want %q", tt.input, result, tt.expected) + } + }) + } +} + +func TestTabularCSVParser_WithSpecialCharacterHeaders(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + input := `avg(totaltime),sum(rcv),response-code,http.method +50.5,1102,200,GET +100.2,2204,404,POST` + + parser := NewTabularCSVParser("special_metric", timestamp, []string{}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + // 2 rows * 3 numeric columns (avg(totaltime), sum(rcv), response-code) = 6 samples + if len(samples) != 6 { + t.Fatalf("Expected 6 samples, got %d", len(samples)) + } + + // Check that metric names are sanitized + expectedMetrics := []string{"special_metric_avg_totaltime", "special_metric_sum_rcv", "special_metric_response_code"} + foundMetrics := make(map[string]bool) + for _, sample := range samples { + foundMetrics[sample.MetricName] = true + } + for _, expected := range expectedMetrics { + if !foundMetrics[expected] { + t.Errorf("Missing expected metric %s", expected) + } + } + + // Check that string column becomes label (http.method -> http_method) + for _, sample := range samples { + if _, ok := sample.Labels["http_method"]; !ok { + t.Errorf("Sample missing sanitized label http_method") + } + } + + // Check values are correct for first row + for _, sample := range samples { + if sample.Labels["http_method"] == "GET" { + if strings.HasSuffix(sample.MetricName, "_avg_totaltime") { + if sample.Value != 50.5 { + t.Errorf("avg_totaltime value = %f, want 50.5", sample.Value) + } + } else if strings.HasSuffix(sample.MetricName, "_sum_rcv") { + if sample.Value != 1102 { + t.Errorf("sum_rcv value = %f, want 1102", sample.Value) + } + } else if strings.HasSuffix(sample.MetricName, "_response_code") { + if sample.Value != 200 { + t.Errorf("response_code value = %f, want 200", sample.Value) + } + } + } + } +} + +func TestTabularCSVParser_DNSResolution(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + + // Test with localhost IP which should resolve on most systems + input := `ip,responsecode,count +127.0.0.1,200,100` + + parser := NewTabularCSVParser("test_metric", timestamp, []string{"ip"}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + // Should have 1 sample (count metric) + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + // The ip label should either be resolved to a hostname or remain as IP if DNS failed + ipLabel := samples[0].Labels["ip"] + if ipLabel == "" { + t.Error("ip label is empty") + } + + // If resolution succeeded, it should not be the original IP + // If it failed, it should still be the IP + t.Logf("IP label value: %s (original: 127.0.0.1)", ipLabel) +} + +func TestTabularCSVParser_DNSResolutionDisabled(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + + input := `ip,responsecode,count +192.168.1.1,200,100` + + // Pass empty slice - no DNS resolution + parser := NewTabularCSVParser("test_metric", timestamp, []string{}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + // IP should remain unchanged + ipLabel := samples[0].Labels["ip"] + if ipLabel != "192.168.1.1" { + t.Errorf("IP label = %s, want 192.168.1.1 (DNS resolution should be disabled)", ipLabel) + } +} + +func TestTabularCSVParser_DNSResolutionMultipleLabels(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + + input := `source_ip,dest_ip,count +127.0.0.1,192.168.1.1,100` + + // Resolve both source_ip and dest_ip + parser := NewTabularCSVParser("test_metric", timestamp, []string{"source_ip", "dest_ip"}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + // Both labels should be present + if _, ok := samples[0].Labels["source_ip"]; !ok { + t.Error("source_ip label missing") + } + if _, ok := samples[0].Labels["dest_ip"]; !ok { + t.Error("dest_ip label missing") + } + + t.Logf("source_ip: %s", samples[0].Labels["source_ip"]) + t.Logf("dest_ip: %s", samples[0].Labels["dest_ip"]) +} + +func TestTabularCSVParser_DNSResolutionNonIPValue(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + + // ip column contains non-IP value + input := `ip,responsecode,count +not-an-ip,200,100` + + parser := NewTabularCSVParser("test_metric", timestamp, []string{"ip"}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + if len(samples) != 1 { + t.Fatalf("Expected 1 sample, got %d", len(samples)) + } + + // Non-IP value should remain unchanged + ipLabel := samples[0].Labels["ip"] + if ipLabel != "not-an-ip" { + t.Errorf("IP label = %s, want not-an-ip (should not resolve non-IP values)", ipLabel) + } +} + +func TestTabularCSVParser_DNSResolutionCaching(t *testing.T) { + ctx := context.Background() + timestamp := time.Now() + + // Multiple rows with same IP + input := `ip,responsecode,count +127.0.0.1,200,100 +127.0.0.1,404,50 +127.0.0.1,500,5` + + parser := NewTabularCSVParser("test_metric", timestamp, []string{"ip"}) + reader := strings.NewReader(input) + samples, err := parser.Parse(ctx, reader) + + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + // Should have 3 samples + if len(samples) != 3 { + t.Fatalf("Expected 3 samples, got %d", len(samples)) + } + + // All should have same resolved IP (cached result) + firstIP := samples[0].Labels["ip"] + for i, sample := range samples { + if sample.Labels["ip"] != firstIP { + t.Errorf("Sample %d has different IP label: %s vs %s (caching issue)", i, sample.Labels["ip"], firstIP) + } + } + + // Check cache size + if parser.resolver.GetCacheSize() != 1 { + t.Errorf("Expected cache size 1 (one unique IP), got %d", parser.resolver.GetCacheSize()) + } +} diff --git a/internal/resolver/dns_resolver.go b/internal/resolver/dns_resolver.go new file mode 100644 index 0000000..22ab184 --- /dev/null +++ b/internal/resolver/dns_resolver.go @@ -0,0 +1,274 @@ +package resolver + +import ( + "encoding/json" + "log" + "math/rand" + "net" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +// DNSResolver provides thread-safe reverse DNS resolution with caching and rate limiting +type DNSResolver struct { + cache map[string]*DNSCacheEntry // IP -> cache entry with metadata + mu sync.RWMutex + rateLimiter chan struct{} // Channel for rate limiting DNS lookups + cacheFile string // Path to persistent cache file +} + +// DNSCacheEntry represents a cached DNS resolution with timestamp +type DNSCacheEntry struct { + IP string `json:"ip"` + Hostname string `json:"hostname"` + CachedAt time.Time `json:"cached_at"` + LastRetry time.Time `json:"last_retry,omitempty"` // Last retry attempt for failed lookups + RetryCount int `json:"retry_count,omitempty"` // Number of retry attempts +} + +const ( + // CacheTTL is the default time-to-live for cache entries (7 days) + CacheTTL = 7 * 24 * time.Hour + // FailedLookupRetryInterval is how often to retry failed DNS lookups (1 minute) + FailedLookupRetryInterval = 1 * time.Minute + // MaxRetries is the maximum number of retries for failed lookups + MaxRetries = 5 +) + +// NewDNSResolver creates a new DNS resolver with an empty cache +// Rate limited to ~100 DNS lookups per second to avoid overwhelming DNS servers +// Loads persistent cache from disk if available +func NewDNSResolver() *DNSResolver { + // Determine cache file location + cacheDir := os.Getenv("HOME") + if cacheDir == "" { + cacheDir = "/tmp" + } + cacheFile := filepath.Join(cacheDir, ".epimetheus-dns-cache.json") + + resolver := &DNSResolver{ + cache: make(map[string]*DNSCacheEntry), + rateLimiter: make(chan struct{}, 10), // Allow 10 concurrent lookups + cacheFile: cacheFile, + } + + // Load persistent cache from disk + resolver.loadCache() + + // Clean expired entries on startup + resolver.cleanExpiredEntries() + + // Start rate limiter goroutine that releases tokens every 10ms (100 per second) + go func() { + ticker := time.NewTicker(10 * time.Millisecond) // 100 requests/second + defer ticker.Stop() + for range ticker.C { + select { + case resolver.rateLimiter <- struct{}{}: + default: + // Channel full, skip this tick + } + } + }() + + return resolver +} + +// ResolveIP attempts to resolve an IP address to a hostname +// Returns (hostname, true) if successful, ("", false) if resolution failed or not an IP +// Results are cached to avoid repeated DNS lookups +// Failed lookups are retried after FailedLookupRetryInterval +func (r *DNSResolver) ResolveIP(ip string) (string, bool) { + // Validate it's an IP address first + if !isIPAddress(ip) { + return "", false + } + + // Check cache first (read lock) + r.mu.RLock() + entry, cached := r.cache[ip] + r.mu.RUnlock() + + if cached { + // Check if this is a failed lookup that should be retried + if entry.Hostname == "" && entry.RetryCount < MaxRetries { + // Check if enough time has passed since last retry + if time.Since(entry.LastRetry) >= FailedLookupRetryInterval { + // Retry the lookup + hostname := r.retryLookup(ip, entry) + return hostname, hostname != "" + } + } + + // Return cached result + return entry.Hostname, entry.Hostname != "" + } + + // Cache miss - resolve and cache the result + hostname := r.resolveAndCache(ip) + return hostname, hostname != "" +} + +// resolveAndCache performs DNS lookup and caches the result +// Returns hostname on success, empty string on failure +// Rate limited to prevent overwhelming DNS servers +func (r *DNSResolver) resolveAndCache(ip string) string { + // Wait for rate limiter token + <-r.rateLimiter + + // Perform reverse DNS lookup + names, err := net.LookupAddr(ip) + + var hostname string + now := time.Now() + + entry := &DNSCacheEntry{ + IP: ip, + CachedAt: now, + } + + if err == nil && len(names) > 0 { + // Take first result and remove trailing dot + hostname = strings.TrimSuffix(names[0], ".") + entry.Hostname = hostname + } else { + // Failed lookup - log and set up for retry + if err != nil { + log.Printf("⚠️ DNS resolution failed for %s: %v (will retry)", ip, err) + } else { + log.Printf("⚠️ DNS resolution failed for %s: no PTR record found (will retry)", ip) + } + entry.LastRetry = now + entry.RetryCount = 0 + } + + // Cache the result (write lock) + r.mu.Lock() + r.cache[ip] = entry + r.mu.Unlock() + + return hostname +} + +// retryLookup retries a failed DNS lookup +func (r *DNSResolver) retryLookup(ip string, oldEntry *DNSCacheEntry) string { + // Wait for rate limiter token + <-r.rateLimiter + + // Perform reverse DNS lookup + names, err := net.LookupAddr(ip) + + var hostname string + now := time.Now() + + r.mu.Lock() + defer r.mu.Unlock() + + if err == nil && len(names) > 0 { + // Successful retry! + hostname = strings.TrimSuffix(names[0], ".") + log.Printf("✅ DNS retry successful for %s -> %s (after %d attempts)", ip, hostname, oldEntry.RetryCount+1) + r.cache[ip] = &DNSCacheEntry{ + IP: ip, + Hostname: hostname, + CachedAt: now, + } + } else { + // Still failed - increment retry count + newRetryCount := oldEntry.RetryCount + 1 + if err != nil { + log.Printf("⚠️ DNS retry %d/%d failed for %s: %v", newRetryCount, MaxRetries, ip, err) + } else { + log.Printf("⚠️ DNS retry %d/%d failed for %s: no PTR record found", newRetryCount, MaxRetries, ip) + } + + r.cache[ip] = &DNSCacheEntry{ + IP: ip, + Hostname: "", + CachedAt: oldEntry.CachedAt, + LastRetry: now, + RetryCount: newRetryCount, + } + + // Log if we've exhausted all retries + if newRetryCount >= MaxRetries { + log.Printf("❌ Giving up on DNS resolution for %s after %d attempts", ip, MaxRetries) + } + } + + return hostname +} + +// isIPAddress checks if a string is a valid IP address +func isIPAddress(s string) bool { + return net.ParseIP(s) != nil +} + +// GetCacheSize returns the current cache size (useful for debugging/testing) +func (r *DNSResolver) GetCacheSize() int { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.cache) +} + +// loadCache loads the persistent DNS cache from disk +func (r *DNSResolver) loadCache() { + data, err := os.ReadFile(r.cacheFile) + if err != nil { + // Cache file doesn't exist or can't be read - start with empty cache + return + } + + var entries []DNSCacheEntry + if err := json.Unmarshal(data, &entries); err != nil { + // Invalid cache file - start with empty cache + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + // Load entries into cache + for i := range entries { + r.cache[entries[i].IP] = &entries[i] + } +} + +// SaveCache saves the current DNS cache to disk +func (r *DNSResolver) SaveCache() error { + r.mu.RLock() + entries := make([]DNSCacheEntry, 0, len(r.cache)) + for _, entry := range r.cache { + entries = append(entries, *entry) + } + r.mu.RUnlock() + + data, err := json.MarshalIndent(entries, "", " ") + if err != nil { + return err + } + + return os.WriteFile(r.cacheFile, data, 0644) +} + +// cleanExpiredEntries removes entries older than CacheTTL +// Adds randomization to prevent all entries expiring at once +func (r *DNSResolver) cleanExpiredEntries() { + r.mu.Lock() + defer r.mu.Unlock() + + now := time.Now() + for ip, entry := range r.cache { + // Add random jitter (±20%) to TTL to prevent thundering herd + jitter := time.Duration(float64(CacheTTL) * 0.2 * (0.5 - rand.Float64() * 2)) + effectiveTTL := CacheTTL + jitter + + age := now.Sub(entry.CachedAt) + if age > effectiveTTL { + delete(r.cache, ip) + } + } +} diff --git a/internal/resolver/dns_resolver_test.go b/internal/resolver/dns_resolver_test.go new file mode 100644 index 0000000..6eab3a7 --- /dev/null +++ b/internal/resolver/dns_resolver_test.go @@ -0,0 +1,232 @@ +package resolver + +import ( + "sync" + "testing" +) + +func TestNewDNSResolver(t *testing.T) { + resolver := NewDNSResolver() + if resolver == nil { + t.Fatal("NewDNSResolver returned nil") + } + if resolver.cache == nil { + t.Fatal("cache map not initialized") + } + if len(resolver.cache) != 0 { + t.Errorf("expected empty cache, got size %d", len(resolver.cache)) + } +} + +func TestIsIPAddress(t *testing.T) { + tests := []struct { + input string + expected bool + }{ + {"192.168.1.1", true}, + {"10.0.0.1", true}, + {"255.255.255.255", true}, + {"2001:db8::1", true}, + {"::1", true}, + {"not-an-ip", false}, + {"", false}, + {"256.1.1.1", false}, + {"192.168.1", false}, + {"server.example.com", false}, + } + + for _, tt := range tests { + result := isIPAddress(tt.input) + if result != tt.expected { + t.Errorf("isIPAddress(%q) = %v, want %v", tt.input, result, tt.expected) + } + } +} + +func TestResolveIP_InvalidIP(t *testing.T) { + resolver := NewDNSResolver() + + invalidInputs := []string{ + "not-an-ip", + "server.example.com", + "", + "invalid", + } + + for _, input := range invalidInputs { + hostname, ok := resolver.ResolveIP(input) + if ok { + t.Errorf("ResolveIP(%q) returned ok=true for invalid IP", input) + } + if hostname != "" { + t.Errorf("ResolveIP(%q) returned hostname=%q for invalid IP", input, hostname) + } + } +} + +func TestResolveIP_Localhost(t *testing.T) { + resolver := NewDNSResolver() + + // Test localhost resolution (should work on most systems) + hostname, ok := resolver.ResolveIP("127.0.0.1") + + // We expect either success with "localhost" or failure (depending on system DNS config) + // Just verify the function doesn't panic and caches the result + if ok && hostname == "" { + t.Error("ResolveIP returned ok=true but empty hostname") + } + + // Verify caching - second call should hit cache + initialCacheSize := resolver.GetCacheSize() + if initialCacheSize != 1 { + t.Errorf("expected cache size 1 after first lookup, got %d", initialCacheSize) + } + + // Second lookup should use cache + hostname2, ok2 := resolver.ResolveIP("127.0.0.1") + if hostname2 != hostname || ok2 != ok { + t.Error("second lookup returned different result (cache not working)") + } + + // Cache size should still be 1 + if resolver.GetCacheSize() != 1 { + t.Errorf("cache size changed on second lookup") + } +} + +func TestResolveIP_CachingFailedLookup(t *testing.T) { + resolver := NewDNSResolver() + + // Use a private IP that likely won't resolve + nonExistentIP := "192.168.255.254" + + // First lookup + hostname1, ok1 := resolver.ResolveIP(nonExistentIP) + + // Verify it's cached (even if it failed) + if resolver.GetCacheSize() != 1 { + t.Errorf("expected cache size 1 after first lookup, got %d", resolver.GetCacheSize()) + } + + // Second lookup should return same result from cache + hostname2, ok2 := resolver.ResolveIP(nonExistentIP) + + if hostname1 != hostname2 { + t.Errorf("hostname changed between lookups: %q vs %q", hostname1, hostname2) + } + if ok1 != ok2 { + t.Errorf("ok changed between lookups: %v vs %v", ok1, ok2) + } + + // Cache size should still be 1 + if resolver.GetCacheSize() != 1 { + t.Errorf("cache size should remain 1, got %d", resolver.GetCacheSize()) + } +} + +func TestResolveIP_MultipleIPs(t *testing.T) { + resolver := NewDNSResolver() + + ips := []string{ + "127.0.0.1", + "192.168.1.1", + "10.0.0.1", + } + + // Resolve all IPs + for _, ip := range ips { + resolver.ResolveIP(ip) + } + + // Verify all are cached + if resolver.GetCacheSize() != len(ips) { + t.Errorf("expected cache size %d, got %d", len(ips), resolver.GetCacheSize()) + } + + // Resolve again to verify cache hits + for _, ip := range ips { + resolver.ResolveIP(ip) + } + + // Cache size should not change + if resolver.GetCacheSize() != len(ips) { + t.Errorf("cache size changed on second pass") + } +} + +func TestResolveIP_Concurrent(t *testing.T) { + resolver := NewDNSResolver() + + // Test concurrent access to ensure thread safety + var wg sync.WaitGroup + numGoroutines := 50 + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + // Each goroutine resolves a few IPs + ips := []string{ + "127.0.0.1", + "192.168.1.1", + "10.0.0.1", + } + + for _, ip := range ips { + resolver.ResolveIP(ip) + } + }(i) + } + + wg.Wait() + + // Despite concurrent access, cache should only have unique IPs + expectedSize := 3 + if resolver.GetCacheSize() != expectedSize { + t.Errorf("expected cache size %d after concurrent access, got %d", + expectedSize, resolver.GetCacheSize()) + } +} + +func TestResolveIP_IPv6(t *testing.T) { + resolver := NewDNSResolver() + + // Test IPv6 localhost + hostname, ok := resolver.ResolveIP("::1") + + // Should be cached regardless of success + if resolver.GetCacheSize() != 1 { + t.Errorf("expected cache size 1, got %d", resolver.GetCacheSize()) + } + + // If successful, hostname should not be empty + if ok && hostname == "" { + t.Error("ResolveIP returned ok=true but empty hostname for IPv6") + } +} + +func TestGetCacheSize(t *testing.T) { + resolver := NewDNSResolver() + + if resolver.GetCacheSize() != 0 { + t.Errorf("expected initial cache size 0, got %d", resolver.GetCacheSize()) + } + + // Add entries + resolver.ResolveIP("127.0.0.1") + if resolver.GetCacheSize() != 1 { + t.Errorf("expected cache size 1 after first entry, got %d", resolver.GetCacheSize()) + } + + resolver.ResolveIP("192.168.1.1") + if resolver.GetCacheSize() != 2 { + t.Errorf("expected cache size 2 after second entry, got %d", resolver.GetCacheSize()) + } + + // Resolving same IP shouldn't increase size + resolver.ResolveIP("127.0.0.1") + if resolver.GetCacheSize() != 2 { + t.Errorf("expected cache size to remain 2, got %d", resolver.GetCacheSize()) + } +} diff --git a/internal/version/version.go b/internal/version/version.go new file mode 100644 index 0000000..cdd04c6 --- /dev/null +++ b/internal/version/version.go @@ -0,0 +1,4 @@ +package version + +// Version is the current version of prometheus-pusher +const Version = "0.0.0" diff --git a/internal/watcher/file_watcher.go b/internal/watcher/file_watcher.go new file mode 100644 index 0000000..63ccb08 --- /dev/null +++ b/internal/watcher/file_watcher.go @@ -0,0 +1,86 @@ +package watcher + +import ( + "context" + "fmt" + "os" + "time" +) + +// FileWatcher monitors a file for changes based on modification time. +type FileWatcher struct { + filePath string + pollInterval time.Duration + lastModTime time.Time +} + +// NewFileWatcher creates a new file watcher. +func NewFileWatcher(filePath string, pollInterval time.Duration) *FileWatcher { + return &FileWatcher{ + filePath: filePath, + pollInterval: pollInterval, + } +} + +// Watch starts watching the file and returns a channel that signals changes. +// If the file doesn't exist yet, it will wait until it appears. +func (fw *FileWatcher) Watch(ctx context.Context) (<-chan struct{}, error) { + changeChan := make(chan struct{}, 1) + + go func() { + defer close(changeChan) + ticker := time.NewTicker(fw.pollInterval) + defer ticker.Stop() + + fileExists := false + + // Wait for file to exist + for !fileExists { + select { + case <-ctx.Done(): + return + case <-ticker.C: + info, err := os.Stat(fw.filePath) + if err != nil { + // File doesn't exist yet, keep waiting + fmt.Printf("Waiting for file to exist: %s\n", fw.filePath) + continue + } + + // File now exists! + fileExists = true + fw.lastModTime = info.ModTime() + // Send initial change to process file + changeChan <- struct{}{} + fmt.Printf("File detected, starting watch: %s\n", fw.filePath) + } + } + + // Now watch for changes + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + info, err := os.Stat(fw.filePath) + if err != nil { + // File might have been deleted or is temporarily unavailable + continue + } + + modTime := info.ModTime() + if modTime.After(fw.lastModTime) { + fw.lastModTime = modTime + changeChan <- struct{}{} + } + } + } + }() + + return changeChan, nil +} + +// GetLastModTime returns the last known modification time of the file. +func (fw *FileWatcher) GetLastModTime() time.Time { + return fw.lastModTime +} diff --git a/logo.png b/logo.png Binary files differnew file mode 100644 index 0000000..a98b33d --- /dev/null +++ b/logo.png @@ -0,0 +1,31 @@ +#!/bin/bash + +# Simple script to run Epimetheus +# Automatically sets up port-forwarding and runs the binary + +set -e + +echo "Starting Epimetheus..." +echo "" +echo "Step 1: Setting up port-forward to Pushgateway..." +kubectl port-forward -n monitoring svc/pushgateway 9091:9091 > /tmp/pushgateway-port-forward.log 2>&1 & +PF_PID=$! + +# Wait for port-forward to be ready +sleep 2 + +echo "Step 2: Running epimetheus binary (realtime mode)..." +echo "Press Ctrl+C to stop" +echo "" + +# Run the binary in realtime mode and capture its exit status +./epimetheus -mode=realtime -continuous +EXIT_CODE=$? + +# Clean up port-forward +echo "" +echo "Cleaning up port-forward..." +kill $PF_PID 2>/dev/null || true + +echo "Done!" +exit $EXIT_CODE diff --git a/test-data/watch-clickhouse-test.csv b/test-data/watch-clickhouse-test.csv new file mode 100644 index 0000000..b4ca2b2 --- /dev/null +++ b/test-data/watch-clickhouse-test.csv @@ -0,0 +1,12 @@ +# Test dataset for watch mode with ClickHouse ingestion +# Tabular CSV: first row = headers, numeric columns = metrics, string columns = labels +# Use with: ./epimetheus -mode=watch -file=test-data/watch-clickhouse-test.csv -metric-name=watch_test -clickhouse=http://localhost:8123 -prometheus= + +instance,env,requests_total,latency_ms,active_connections +web1,prod,100,45.2,12 +web2,prod,85,38.1,8 +web3,prod,120,52.3,15 +api1,staging,200,120.5,25 +api2,staging,180,95.2,22 +db1,prod,50,5.1,3 +cache1,prod,500,2.3,50 diff --git a/verify-clickhouse.sh b/verify-clickhouse.sh new file mode 100755 index 0000000..5819f18 --- /dev/null +++ b/verify-clickhouse.sh @@ -0,0 +1,52 @@ +#!/bin/bash +# Verify that epimetheus metrics were successfully ingested into ClickHouse. +# Usage: ./verify-clickhouse.sh [clickhouse_url] [table_name] +# Default: http://localhost:8123, epimetheus_metrics + +set -e + +CLICKHOUSE_URL="${1:-http://localhost:8123}" +TABLE="${2:-epimetheus_metrics}" + +echo "Verifying ClickHouse ingestion..." +echo " URL: $CLICKHOUSE_URL" +echo " Table: $TABLE" +echo "" + +# Check connectivity +if ! curl -sS "${CLICKHOUSE_URL}/ping" > /dev/null 2>&1; then + echo "ERROR: Cannot connect to ClickHouse at $CLICKHOUSE_URL" + echo " Make sure ClickHouse is running: sudo systemctl start clickhouse-server" + exit 1 +fi + +echo "✓ ClickHouse is reachable" +echo "" + +# Query 1: Row count +echo "--- Row count ---" +COUNT=$(curl -sS "${CLICKHOUSE_URL}/?query=SELECT%20count()%20FROM%20${TABLE}" 2>/dev/null | tail -1) +if [ -z "$COUNT" ] || [ "$COUNT" = "0" ]; then + echo "ERROR: Table $TABLE is empty or does not exist" + echo " Run: ./epimetheus -mode=watch -file=test-data/watch-clickhouse-test.csv -metric-name=watch_test -clickhouse=$CLICKHOUSE_URL -prometheus=" + exit 1 +fi +echo "Total rows: $COUNT" +echo "" + +# Query 2: Distinct metrics +echo "--- Metrics in table ---" +curl -sS "${CLICKHOUSE_URL}/?query=SELECT%20distinct%20metric%20FROM%20${TABLE}%20ORDER%20BY%20metric%20FORMAT%20PrettyCompact" 2>/dev/null +echo "" + +# Query 3: Sample data +echo "--- Sample rows (last 5) ---" +curl -sS "${CLICKHOUSE_URL}/?query=SELECT%20metric%2C%20labels%2C%20value%2C%20timestamp%20FROM%20${TABLE}%20ORDER%20BY%20timestamp%20DESC%20LIMIT%205%20FORMAT%20PrettyCompact" 2>/dev/null +echo "" + +# Query 4: Aggregation by metric +echo "--- Rows per metric ---" +curl -sS "${CLICKHOUSE_URL}/?query=SELECT%20metric%2C%20count()%20AS%20cnt%20FROM%20${TABLE}%20GROUP%20BY%20metric%20ORDER%20BY%20cnt%20DESC%20FORMAT%20PrettyCompact" 2>/dev/null +echo "" + +echo "✅ ClickHouse verification complete - data is present and queryable" |
