diff options
| -rw-r--r-- | cmd/epimetheus/main.go | 390 |
1 files changed, 390 insertions, 0 deletions
diff --git a/cmd/epimetheus/main.go b/cmd/epimetheus/main.go new file mode 100644 index 0000000..4f89918 --- /dev/null +++ b/cmd/epimetheus/main.go @@ -0,0 +1,390 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "math/rand" + "os" + "os/signal" + "path/filepath" + "strings" + "sync" + "syscall" + "time" + + "epimetheus/internal/config" + "epimetheus/internal/ingester" + "epimetheus/internal/metrics" + "epimetheus/internal/parser" + "epimetheus/internal/version" + "epimetheus/internal/watcher" +) + +func main() { + cfg := parseFlags() + + rand.Seed(time.Now().UnixNano()) + + ctx, cancel := createContextWithSignalHandler() + defer cancel() + + if err := run(ctx, cfg); err != nil { + log.Fatalf("Error: %v", err) + } +} + +// parseFlags parses command-line flags and returns a Config. +func parseFlags() config.Config { + cfg := config.NewConfig() + + showVersion := flag.Bool("version", false, "Print version and exit") + mode := flag.String("mode", "realtime", "Mode: realtime, historic, backfill, auto, or watch") + pushgatewayURL := flag.String("pushgateway", cfg.PushgatewayURL, "Pushgateway URL for realtime mode") + prometheusURL := flag.String("prometheus", cfg.PrometheusURL, "Prometheus remote write URL for historic mode") + jobName := flag.String("job", cfg.JobName, "Job name for metrics") + continuous := flag.Bool("continuous", false, "For realtime mode: push continuously every 15s") + + hoursAgo := flag.Int("hours-ago", cfg.HoursAgo, "For historic mode: how many hours ago (single datapoint)") + startHours := flag.Int("start-hours", cfg.StartHours, "For backfill: start time in hours ago") + endHours := flag.Int("end-hours", cfg.EndHours, "For backfill: end time in hours ago") + interval := flag.Int("interval", cfg.Interval, "For backfill: interval between datapoints in hours") + + inputFile := flag.String("file", "", "For auto/watch mode: input file(s) with metrics (comma-separated for multiple)") + inputFormat := flag.String("format", cfg.InputFormat, "For auto mode: input format (csv or json)") + metricName := flag.String("metric-name", "", "For watch mode: metric name (optional, auto-derived from filename)") + resolveIPLabels := flag.String("resolve-ip-labels", "", "For watch mode: comma-separated list of additional IP labels to resolve via DNS (default: ip)") + clickhouseURL := flag.String("clickhouse", "", "For watch mode: ClickHouse HTTP URL (e.g. http://localhost:8123) to also ingest metrics") + clickhouseTable := flag.String("clickhouse-table", "epimetheus_metrics", "For watch mode: ClickHouse table name") + + flag.Parse() + + if *showVersion { + fmt.Printf("epimetheus version %s\n", version.Version) + os.Exit(0) + } + + cfg.Mode = config.Mode(*mode) + cfg.PushgatewayURL = *pushgatewayURL + cfg.PrometheusURL = *prometheusURL + cfg.JobName = *jobName + cfg.Continuous = *continuous + cfg.HoursAgo = *hoursAgo + cfg.StartHours = *startHours + cfg.EndHours = *endHours + cfg.Interval = *interval + cfg.InputFile = *inputFile + cfg.InputFormat = *inputFormat + cfg.MetricName = *metricName + cfg.ClickHouseURL = *clickhouseURL + cfg.ClickHouseTable = *clickhouseTable + + // Parse resolve-ip-labels flag + cfg.ResolveIPLabels = []string{"ip"} // Always include default + if *resolveIPLabels != "" { + additionalLabels := strings.Split(*resolveIPLabels, ",") + for _, label := range additionalLabels { + label = strings.TrimSpace(label) + if label != "" && label != "ip" { // Avoid duplicates + cfg.ResolveIPLabels = append(cfg.ResolveIPLabels, label) + } + } + } + + return cfg +} + +// createContextWithSignalHandler creates a context that cancels on interrupt signals. +func createContextWithSignalHandler() (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + + go func() { + <-sigChan + log.Printf("\nReceived interrupt signal, shutting down...") + cancel() + }() + + return ctx, cancel +} + +// run executes the appropriate mode based on configuration. +func run(ctx context.Context, cfg config.Config) error { + switch cfg.Mode { + case config.ModeRealtime: + return runRealtimeMode(ctx, cfg) + case config.ModeHistoric: + return runHistoricMode(ctx, cfg) + case config.ModeBackfill: + return runBackfillMode(ctx, cfg) + case config.ModeAuto: + return runAutoMode(ctx, cfg) + case config.ModeWatch: + return runWatchMode(ctx, cfg) + default: + return fmt.Errorf("unknown mode: %s (use realtime, historic, backfill, auto, or watch)", cfg.Mode) + } +} + +// runRealtimeMode runs the realtime ingestion mode. +func runRealtimeMode(ctx context.Context, cfg config.Config) error { + log.Printf("Starting Prometheus metrics pusher in REALTIME mode") + log.Printf("Pushgateway URL: %s", cfg.PushgatewayURL) + log.Printf("Job name: %s", cfg.JobName) + + collectors := metrics.NewCollectors() + pushgateway := ingester.NewPushgatewayIngester() + + if err := pushgateway.Ingest(ctx, collectors, cfg.PushgatewayURL, cfg.JobName); err != nil { + return fmt.Errorf("failed to push metrics: %w", err) + } + log.Printf("Successfully pushed metrics to Pushgateway") + + if cfg.Continuous { + return runContinuousMode(ctx, pushgateway, collectors, cfg) + } + + return nil +} + +// runContinuousMode pushes metrics continuously every 15 seconds. +func runContinuousMode(ctx context.Context, pushgateway ingester.PushgatewayIngester, collectors metrics.Collectors, cfg config.Config) error { + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + log.Printf("Continuous mode: pushing metrics every 15 seconds. Press Ctrl+C to stop.") + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + if err := pushgateway.Ingest(ctx, collectors, cfg.PushgatewayURL, cfg.JobName); err != nil { + log.Printf("Error pushing metrics: %v", err) + } else { + log.Printf("Successfully pushed metrics to Pushgateway") + } + } + } +} + +// runHistoricMode runs the historic ingestion mode. +func runHistoricMode(ctx context.Context, cfg config.Config) error { + remoteWrite := ingester.NewRemoteWriteIngester() + return remoteWrite.IngestHistoric(ctx, cfg.PrometheusURL, cfg.HoursAgo) +} + +// runBackfillMode runs the backfill ingestion mode. +func runBackfillMode(ctx context.Context, cfg config.Config) error { + remoteWrite := ingester.NewRemoteWriteIngester() + return remoteWrite.Backfill(ctx, cfg.PrometheusURL, cfg.StartHours, cfg.EndHours, cfg.Interval) +} + +// runAutoMode runs the auto ingestion mode. +func runAutoMode(ctx context.Context, cfg config.Config) error { + log.Printf("🤖 AUTO mode: Automatically detecting timestamp age and choosing ingestion method") + + samples, err := loadSamples(ctx, cfg) + if err != nil { + return err + } + + logFileSource(cfg) + + collectors := metrics.NewCollectors() + autoIngester := ingester.NewAutoIngester(collectors) + + return autoIngester.Ingest(ctx, samples, cfg) +} + +// loadSamples loads samples from file or stdin based on configuration. +func loadSamples(ctx context.Context, cfg config.Config) ([]metrics.Sample, error) { + if cfg.InputFile != "" { + return parser.ParseFile(ctx, cfg.InputFile, cfg.InputFormat) + } + return parser.ParseStdin(ctx, cfg.InputFormat) +} + +// logFileSource logs the source of the input data. +func logFileSource(cfg config.Config) { + if cfg.InputFile != "" { + log.Printf("📁 Reading metrics from: %s (format: %s)", cfg.InputFile, cfg.InputFormat) + } else { + log.Printf("📥 Reading metrics from stdin (format: %s)", cfg.InputFormat) + } +} + +// runWatchMode runs the watch mode for monitoring CSV file changes. +// Supports watching multiple files by providing comma-separated list. +func runWatchMode(ctx context.Context, cfg config.Config) error { + if cfg.InputFile == "" { + return fmt.Errorf("watch mode requires -file flag") + } + + // Parse comma-separated file list + files := strings.Split(cfg.InputFile, ",") + for i := range files { + files[i] = strings.TrimSpace(files[i]) + } + + log.Printf("👁️ WATCH mode: Monitoring %d file(s)", len(files)) + if cfg.PrometheusURL != "" { + log.Printf("Prometheus URL: %s", cfg.PrometheusURL) + } + if cfg.ClickHouseURL != "" { + log.Printf("ClickHouse URL: %s (table: %s)", cfg.ClickHouseURL, cfg.ClickHouseTable) + } + log.Printf("Job name: %s", cfg.JobName) + log.Printf("Polling interval: 1 second") + + // Create ingesters (shared across all files) + var remoteWrite *ingester.RemoteWriteIngester + if cfg.PrometheusURL != "" { + rw := ingester.NewRemoteWriteIngester() + remoteWrite = &rw + } + + var clickhouse *ingester.ClickHouseIngester + if cfg.ClickHouseURL != "" { + ch := ingester.NewClickHouseIngester(cfg.ClickHouseURL, cfg.ClickHouseTable) + clickhouse = &ch + } + + if remoteWrite == nil && clickhouse == nil { + return fmt.Errorf("watch mode requires at least one of -prometheus or -clickhouse") + } + + // Use WaitGroup to manage multiple watchers + var wg sync.WaitGroup + errChan := make(chan error, len(files)) + + // Start a watcher for each file + for _, filePath := range files { + // Determine metric name: use -metric-name if provided (and only one file), + // otherwise extract from filename + metricName := cfg.MetricName + if metricName == "" || len(files) > 1 { + metricName = extractMetricNameFromFile(filePath) + } + + log.Printf("📁 Watching: %s → metric: %s", filePath, metricName) + + wg.Add(1) + go func(file, metric string) { + defer wg.Done() + if err := watchSingleFile(ctx, file, metric, remoteWrite, clickhouse, cfg); err != nil { + errChan <- fmt.Errorf("watcher for %s failed: %w", file, err) + } + }(filePath, metricName) + } + + log.Printf("✅ Watching for file changes. Press Ctrl+C to stop.") + + // Wait for all watchers to finish or context cancellation + go func() { + wg.Wait() + close(errChan) + }() + + // Collect any errors + var errors []error + for err := range errChan { + errors = append(errors, err) + log.Printf("⚠️ Watcher error: %v", err) + } + + if len(errors) > 0 { + return fmt.Errorf("watch mode completed with %d error(s)", len(errors)) + } + + return nil +} + +// watchSingleFile watches a single file for changes +func watchSingleFile(ctx context.Context, filePath string, metricName string, remoteWrite *ingester.RemoteWriteIngester, clickhouse *ingester.ClickHouseIngester, cfg config.Config) error { + // Create file watcher + fw := watcher.NewFileWatcher(filePath, 1*time.Second) + changeChan, err := fw.Watch(ctx) + if err != nil { + return fmt.Errorf("failed to start file watcher: %w", err) + } + + // Process file on each change + for { + select { + case <-ctx.Done(): + return ctx.Err() + case _, ok := <-changeChan: + if !ok { + // Channel closed, watcher stopped + return nil + } + + if err := processWatchedFile(ctx, filePath, metricName, fw.GetLastModTime(), remoteWrite, clickhouse, cfg); err != nil { + log.Printf("⚠️ Error processing %s: %v", filePath, err) + // Continue watching despite errors + continue + } + } + } +} + +// extractMetricNameFromFile extracts metric name from filename +// e.g., "/path/to/blockstore.csv" -> "blockstore" +func extractMetricNameFromFile(filePath string) string { + base := filepath.Base(filePath) + name := strings.TrimSuffix(base, filepath.Ext(base)) + return name +} + +// processWatchedFile reads and ingests metrics from the watched file +func processWatchedFile(ctx context.Context, filePath string, metricName string, modTime time.Time, remoteWrite *ingester.RemoteWriteIngester, clickhouse *ingester.ClickHouseIngester, cfg config.Config) error { + filename := filepath.Base(filePath) + log.Printf("📊 [%s] File changed, processing metrics (metric: %s)...", filename, metricName) + + // Open the file + file, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + // Parse using tabular CSV parser + tabularParser := parser.NewTabularCSVParser(metricName, modTime, cfg.ResolveIPLabels) + samples, err := tabularParser.Parse(ctx, file) + if err != nil { + return fmt.Errorf("failed to parse CSV: %w", err) + } + + if len(samples) == 0 { + log.Printf("⚠️ [%s] No samples found in file", filename) + return nil + } + + log.Printf("📦 [%s] Parsed %d samples (timestamp: %s)", filename, len(samples), modTime.Format(time.RFC3339)) + + // Push to Prometheus via Remote Write API (preserves timestamp) + if remoteWrite != nil { + if err := remoteWrite.Ingest(ctx, samples, cfg.PrometheusURL); err != nil { + return fmt.Errorf("failed to push metrics to Prometheus: %w", err) + } + log.Printf("✅ [%s] Successfully pushed %d samples to Prometheus", filename, len(samples)) + } + + // Push to ClickHouse + if clickhouse != nil { + if err := clickhouse.Ingest(ctx, samples); err != nil { + return fmt.Errorf("failed to push metrics to ClickHouse: %w", err) + } + log.Printf("✅ [%s] Successfully pushed %d samples to ClickHouse", filename, len(samples)) + } + + // Save DNS cache to disk after successful ingestion + tabularParser.SaveDNSCache() + + return nil +} + |
