summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-02-09 18:00:48 +0200
committerPaul Buetow <paul@buetow.org>2026-02-09 18:00:48 +0200
commitf3ea9a7a1f466b6109271c76eb58189d2a799998 (patch)
treefe4d769cc26a04633314b043fb04803c3a822b69
parent3fd46f3977fb650974e5e936cba362c787c00637 (diff)
add main
-rw-r--r--cmd/epimetheus/main.go390
1 files changed, 390 insertions, 0 deletions
diff --git a/cmd/epimetheus/main.go b/cmd/epimetheus/main.go
new file mode 100644
index 0000000..4f89918
--- /dev/null
+++ b/cmd/epimetheus/main.go
@@ -0,0 +1,390 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "log"
+ "math/rand"
+ "os"
+ "os/signal"
+ "path/filepath"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "epimetheus/internal/config"
+ "epimetheus/internal/ingester"
+ "epimetheus/internal/metrics"
+ "epimetheus/internal/parser"
+ "epimetheus/internal/version"
+ "epimetheus/internal/watcher"
+)
+
+func main() {
+ cfg := parseFlags()
+
+ rand.Seed(time.Now().UnixNano())
+
+ ctx, cancel := createContextWithSignalHandler()
+ defer cancel()
+
+ if err := run(ctx, cfg); err != nil {
+ log.Fatalf("Error: %v", err)
+ }
+}
+
+// parseFlags parses command-line flags and returns a Config.
+func parseFlags() config.Config {
+ cfg := config.NewConfig()
+
+ showVersion := flag.Bool("version", false, "Print version and exit")
+ mode := flag.String("mode", "realtime", "Mode: realtime, historic, backfill, auto, or watch")
+ pushgatewayURL := flag.String("pushgateway", cfg.PushgatewayURL, "Pushgateway URL for realtime mode")
+ prometheusURL := flag.String("prometheus", cfg.PrometheusURL, "Prometheus remote write URL for historic mode")
+ jobName := flag.String("job", cfg.JobName, "Job name for metrics")
+ continuous := flag.Bool("continuous", false, "For realtime mode: push continuously every 15s")
+
+ hoursAgo := flag.Int("hours-ago", cfg.HoursAgo, "For historic mode: how many hours ago (single datapoint)")
+ startHours := flag.Int("start-hours", cfg.StartHours, "For backfill: start time in hours ago")
+ endHours := flag.Int("end-hours", cfg.EndHours, "For backfill: end time in hours ago")
+ interval := flag.Int("interval", cfg.Interval, "For backfill: interval between datapoints in hours")
+
+ inputFile := flag.String("file", "", "For auto/watch mode: input file(s) with metrics (comma-separated for multiple)")
+ inputFormat := flag.String("format", cfg.InputFormat, "For auto mode: input format (csv or json)")
+ metricName := flag.String("metric-name", "", "For watch mode: metric name (optional, auto-derived from filename)")
+ resolveIPLabels := flag.String("resolve-ip-labels", "", "For watch mode: comma-separated list of additional IP labels to resolve via DNS (default: ip)")
+ clickhouseURL := flag.String("clickhouse", "", "For watch mode: ClickHouse HTTP URL (e.g. http://localhost:8123) to also ingest metrics")
+ clickhouseTable := flag.String("clickhouse-table", "epimetheus_metrics", "For watch mode: ClickHouse table name")
+
+ flag.Parse()
+
+ if *showVersion {
+ fmt.Printf("epimetheus version %s\n", version.Version)
+ os.Exit(0)
+ }
+
+ cfg.Mode = config.Mode(*mode)
+ cfg.PushgatewayURL = *pushgatewayURL
+ cfg.PrometheusURL = *prometheusURL
+ cfg.JobName = *jobName
+ cfg.Continuous = *continuous
+ cfg.HoursAgo = *hoursAgo
+ cfg.StartHours = *startHours
+ cfg.EndHours = *endHours
+ cfg.Interval = *interval
+ cfg.InputFile = *inputFile
+ cfg.InputFormat = *inputFormat
+ cfg.MetricName = *metricName
+ cfg.ClickHouseURL = *clickhouseURL
+ cfg.ClickHouseTable = *clickhouseTable
+
+ // Parse resolve-ip-labels flag
+ cfg.ResolveIPLabels = []string{"ip"} // Always include default
+ if *resolveIPLabels != "" {
+ additionalLabels := strings.Split(*resolveIPLabels, ",")
+ for _, label := range additionalLabels {
+ label = strings.TrimSpace(label)
+ if label != "" && label != "ip" { // Avoid duplicates
+ cfg.ResolveIPLabels = append(cfg.ResolveIPLabels, label)
+ }
+ }
+ }
+
+ return cfg
+}
+
+// createContextWithSignalHandler creates a context that cancels on interrupt signals.
+func createContextWithSignalHandler() (context.Context, context.CancelFunc) {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
+
+ go func() {
+ <-sigChan
+ log.Printf("\nReceived interrupt signal, shutting down...")
+ cancel()
+ }()
+
+ return ctx, cancel
+}
+
+// run executes the appropriate mode based on configuration.
+func run(ctx context.Context, cfg config.Config) error {
+ switch cfg.Mode {
+ case config.ModeRealtime:
+ return runRealtimeMode(ctx, cfg)
+ case config.ModeHistoric:
+ return runHistoricMode(ctx, cfg)
+ case config.ModeBackfill:
+ return runBackfillMode(ctx, cfg)
+ case config.ModeAuto:
+ return runAutoMode(ctx, cfg)
+ case config.ModeWatch:
+ return runWatchMode(ctx, cfg)
+ default:
+ return fmt.Errorf("unknown mode: %s (use realtime, historic, backfill, auto, or watch)", cfg.Mode)
+ }
+}
+
+// runRealtimeMode runs the realtime ingestion mode.
+func runRealtimeMode(ctx context.Context, cfg config.Config) error {
+ log.Printf("Starting Prometheus metrics pusher in REALTIME mode")
+ log.Printf("Pushgateway URL: %s", cfg.PushgatewayURL)
+ log.Printf("Job name: %s", cfg.JobName)
+
+ collectors := metrics.NewCollectors()
+ pushgateway := ingester.NewPushgatewayIngester()
+
+ if err := pushgateway.Ingest(ctx, collectors, cfg.PushgatewayURL, cfg.JobName); err != nil {
+ return fmt.Errorf("failed to push metrics: %w", err)
+ }
+ log.Printf("Successfully pushed metrics to Pushgateway")
+
+ if cfg.Continuous {
+ return runContinuousMode(ctx, pushgateway, collectors, cfg)
+ }
+
+ return nil
+}
+
+// runContinuousMode pushes metrics continuously every 15 seconds.
+func runContinuousMode(ctx context.Context, pushgateway ingester.PushgatewayIngester, collectors metrics.Collectors, cfg config.Config) error {
+ ticker := time.NewTicker(15 * time.Second)
+ defer ticker.Stop()
+
+ log.Printf("Continuous mode: pushing metrics every 15 seconds. Press Ctrl+C to stop.")
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C:
+ if err := pushgateway.Ingest(ctx, collectors, cfg.PushgatewayURL, cfg.JobName); err != nil {
+ log.Printf("Error pushing metrics: %v", err)
+ } else {
+ log.Printf("Successfully pushed metrics to Pushgateway")
+ }
+ }
+ }
+}
+
+// runHistoricMode runs the historic ingestion mode.
+func runHistoricMode(ctx context.Context, cfg config.Config) error {
+ remoteWrite := ingester.NewRemoteWriteIngester()
+ return remoteWrite.IngestHistoric(ctx, cfg.PrometheusURL, cfg.HoursAgo)
+}
+
+// runBackfillMode runs the backfill ingestion mode.
+func runBackfillMode(ctx context.Context, cfg config.Config) error {
+ remoteWrite := ingester.NewRemoteWriteIngester()
+ return remoteWrite.Backfill(ctx, cfg.PrometheusURL, cfg.StartHours, cfg.EndHours, cfg.Interval)
+}
+
+// runAutoMode runs the auto ingestion mode.
+func runAutoMode(ctx context.Context, cfg config.Config) error {
+ log.Printf("🤖 AUTO mode: Automatically detecting timestamp age and choosing ingestion method")
+
+ samples, err := loadSamples(ctx, cfg)
+ if err != nil {
+ return err
+ }
+
+ logFileSource(cfg)
+
+ collectors := metrics.NewCollectors()
+ autoIngester := ingester.NewAutoIngester(collectors)
+
+ return autoIngester.Ingest(ctx, samples, cfg)
+}
+
+// loadSamples loads samples from file or stdin based on configuration.
+func loadSamples(ctx context.Context, cfg config.Config) ([]metrics.Sample, error) {
+ if cfg.InputFile != "" {
+ return parser.ParseFile(ctx, cfg.InputFile, cfg.InputFormat)
+ }
+ return parser.ParseStdin(ctx, cfg.InputFormat)
+}
+
+// logFileSource logs the source of the input data.
+func logFileSource(cfg config.Config) {
+ if cfg.InputFile != "" {
+ log.Printf("📁 Reading metrics from: %s (format: %s)", cfg.InputFile, cfg.InputFormat)
+ } else {
+ log.Printf("📥 Reading metrics from stdin (format: %s)", cfg.InputFormat)
+ }
+}
+
+// runWatchMode runs the watch mode for monitoring CSV file changes.
+// Supports watching multiple files by providing comma-separated list.
+func runWatchMode(ctx context.Context, cfg config.Config) error {
+ if cfg.InputFile == "" {
+ return fmt.Errorf("watch mode requires -file flag")
+ }
+
+ // Parse comma-separated file list
+ files := strings.Split(cfg.InputFile, ",")
+ for i := range files {
+ files[i] = strings.TrimSpace(files[i])
+ }
+
+ log.Printf("👁️ WATCH mode: Monitoring %d file(s)", len(files))
+ if cfg.PrometheusURL != "" {
+ log.Printf("Prometheus URL: %s", cfg.PrometheusURL)
+ }
+ if cfg.ClickHouseURL != "" {
+ log.Printf("ClickHouse URL: %s (table: %s)", cfg.ClickHouseURL, cfg.ClickHouseTable)
+ }
+ log.Printf("Job name: %s", cfg.JobName)
+ log.Printf("Polling interval: 1 second")
+
+ // Create ingesters (shared across all files)
+ var remoteWrite *ingester.RemoteWriteIngester
+ if cfg.PrometheusURL != "" {
+ rw := ingester.NewRemoteWriteIngester()
+ remoteWrite = &rw
+ }
+
+ var clickhouse *ingester.ClickHouseIngester
+ if cfg.ClickHouseURL != "" {
+ ch := ingester.NewClickHouseIngester(cfg.ClickHouseURL, cfg.ClickHouseTable)
+ clickhouse = &ch
+ }
+
+ if remoteWrite == nil && clickhouse == nil {
+ return fmt.Errorf("watch mode requires at least one of -prometheus or -clickhouse")
+ }
+
+ // Use WaitGroup to manage multiple watchers
+ var wg sync.WaitGroup
+ errChan := make(chan error, len(files))
+
+ // Start a watcher for each file
+ for _, filePath := range files {
+ // Determine metric name: use -metric-name if provided (and only one file),
+ // otherwise extract from filename
+ metricName := cfg.MetricName
+ if metricName == "" || len(files) > 1 {
+ metricName = extractMetricNameFromFile(filePath)
+ }
+
+ log.Printf("📁 Watching: %s → metric: %s", filePath, metricName)
+
+ wg.Add(1)
+ go func(file, metric string) {
+ defer wg.Done()
+ if err := watchSingleFile(ctx, file, metric, remoteWrite, clickhouse, cfg); err != nil {
+ errChan <- fmt.Errorf("watcher for %s failed: %w", file, err)
+ }
+ }(filePath, metricName)
+ }
+
+ log.Printf("✅ Watching for file changes. Press Ctrl+C to stop.")
+
+ // Wait for all watchers to finish or context cancellation
+ go func() {
+ wg.Wait()
+ close(errChan)
+ }()
+
+ // Collect any errors
+ var errors []error
+ for err := range errChan {
+ errors = append(errors, err)
+ log.Printf("⚠️ Watcher error: %v", err)
+ }
+
+ if len(errors) > 0 {
+ return fmt.Errorf("watch mode completed with %d error(s)", len(errors))
+ }
+
+ return nil
+}
+
+// watchSingleFile watches a single file for changes
+func watchSingleFile(ctx context.Context, filePath string, metricName string, remoteWrite *ingester.RemoteWriteIngester, clickhouse *ingester.ClickHouseIngester, cfg config.Config) error {
+ // Create file watcher
+ fw := watcher.NewFileWatcher(filePath, 1*time.Second)
+ changeChan, err := fw.Watch(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to start file watcher: %w", err)
+ }
+
+ // Process file on each change
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case _, ok := <-changeChan:
+ if !ok {
+ // Channel closed, watcher stopped
+ return nil
+ }
+
+ if err := processWatchedFile(ctx, filePath, metricName, fw.GetLastModTime(), remoteWrite, clickhouse, cfg); err != nil {
+ log.Printf("⚠️ Error processing %s: %v", filePath, err)
+ // Continue watching despite errors
+ continue
+ }
+ }
+ }
+}
+
+// extractMetricNameFromFile extracts metric name from filename
+// e.g., "/path/to/blockstore.csv" -> "blockstore"
+func extractMetricNameFromFile(filePath string) string {
+ base := filepath.Base(filePath)
+ name := strings.TrimSuffix(base, filepath.Ext(base))
+ return name
+}
+
+// processWatchedFile reads and ingests metrics from the watched file
+func processWatchedFile(ctx context.Context, filePath string, metricName string, modTime time.Time, remoteWrite *ingester.RemoteWriteIngester, clickhouse *ingester.ClickHouseIngester, cfg config.Config) error {
+ filename := filepath.Base(filePath)
+ log.Printf("📊 [%s] File changed, processing metrics (metric: %s)...", filename, metricName)
+
+ // Open the file
+ file, err := os.Open(filePath)
+ if err != nil {
+ return fmt.Errorf("failed to open file: %w", err)
+ }
+ defer file.Close()
+
+ // Parse using tabular CSV parser
+ tabularParser := parser.NewTabularCSVParser(metricName, modTime, cfg.ResolveIPLabels)
+ samples, err := tabularParser.Parse(ctx, file)
+ if err != nil {
+ return fmt.Errorf("failed to parse CSV: %w", err)
+ }
+
+ if len(samples) == 0 {
+ log.Printf("⚠️ [%s] No samples found in file", filename)
+ return nil
+ }
+
+ log.Printf("📦 [%s] Parsed %d samples (timestamp: %s)", filename, len(samples), modTime.Format(time.RFC3339))
+
+ // Push to Prometheus via Remote Write API (preserves timestamp)
+ if remoteWrite != nil {
+ if err := remoteWrite.Ingest(ctx, samples, cfg.PrometheusURL); err != nil {
+ return fmt.Errorf("failed to push metrics to Prometheus: %w", err)
+ }
+ log.Printf("✅ [%s] Successfully pushed %d samples to Prometheus", filename, len(samples))
+ }
+
+ // Push to ClickHouse
+ if clickhouse != nil {
+ if err := clickhouse.Ingest(ctx, samples); err != nil {
+ return fmt.Errorf("failed to push metrics to ClickHouse: %w", err)
+ }
+ log.Printf("✅ [%s] Successfully pushed %d samples to ClickHouse", filename, len(samples))
+ }
+
+ // Save DNS cache to disk after successful ingestion
+ tabularParser.SaveDNSCache()
+
+ return nil
+}
+