diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-05 08:50:33 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-05 08:50:33 +0200 |
| commit | 5d1b9f1062d38c301c0995ec6da980bdf5e48332 (patch) | |
| tree | 81e1a8963ea66cf06164e89beb6cd2da0ee325f7 | |
| parent | bb46cfbccea301721fb93485ea7169f5841feda3 (diff) | |
Improve lint/vet reliability and refactor client runtime/bootstrap
| -rw-r--r-- | Makefile | 27 | ||||
| -rw-r--r-- | benchmarks/profile_runner.go | 58 | ||||
| -rw-r--r-- | benchmarks/testdata_generator.go | 48 | ||||
| -rw-r--r-- | cmd/dcat/main.go | 48 | ||||
| -rw-r--r-- | cmd/dgrep/main.go | 47 | ||||
| -rw-r--r-- | cmd/dmap/main.go | 50 | ||||
| -rw-r--r-- | cmd/dtail/main.go | 57 | ||||
| -rw-r--r-- | integrationtests/testhelpers.go | 40 | ||||
| -rw-r--r-- | internal/cli/runtime.go | 84 | ||||
| -rw-r--r-- | internal/config/initializer.go | 17 | ||||
| -rw-r--r-- | internal/config/initializer_test.go | 68 | ||||
| -rw-r--r-- | internal/mapr/logformat/custom1.go | 1 | ||||
| -rw-r--r-- | internal/mapr/logformat/custom2.go | 1 | ||||
| -rw-r--r-- | internal/server/handlers/authkeycommand_test.go | 6 | ||||
| -rw-r--r-- | internal/server/handlers/basehandler.go | 10 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand_server.go | 65 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 2 | ||||
| -rw-r--r-- | internal/ssh/client/knownhostscallback.go | 10 | ||||
| -rw-r--r-- | internal/ssh/server/authkeystore.go | 4 | ||||
| -rw-r--r-- | internal/tools/common/data_generator.go | 32 | ||||
| -rw-r--r-- | internal/tools/profile/analyze.go | 48 |
21 files changed, 426 insertions, 297 deletions
@@ -38,18 +38,29 @@ clean: @echo "Removing .prof files..." find . -name "*.prof" -type f -delete vet: - find . -type d | grep -E -v '(./examples|./log|./doc)' | while read dir; do \ - echo ${GO} vet $$dir; \ - ${GO} vet $$dir; \ + @set -e; \ + packages=`${GO} list ./... | grep -v '^github.com/mimecast/dtail/benchmarks/cmd$$'`; \ + for pkg in $$packages; do \ + echo ${GO} vet $$pkg; \ + ${GO} vet $$pkg; \ done sh -c 'grep -R NEXT: .' sh -c 'grep -R TODO: .' lint: - ${GO} get golang.org/x/lint/golint - find . -type d | while read dir; do \ - echo golint $$dir; \ - golint $$dir; \ - done | grep -F .go: + @set -e; \ + ${GO} install golang.org/x/lint/golint@v0.0.0-20241112194109-818c5a804067; \ + gobin=`${GO} env GOBIN`; \ + if [ -z "$$gobin" ]; then \ + gobin=`${GO} env GOPATH`/bin; \ + fi; \ + golint_bin=$$gobin/golint; \ + packages=`${GO} list ./... | grep -v '^github.com/mimecast/dtail/benchmarks/cmd$$'`; \ + echo "Using $$golint_bin"; \ + output=`$$golint_bin $$packages || true`; \ + if [ -n "$$output" ]; then \ + echo "$$output"; \ + exit 1; \ + fi test: ${GO} clean -testcache set -e; find . -name '*_test.go' | while read file; do dirname $$file; done | \ diff --git a/benchmarks/profile_runner.go b/benchmarks/profile_runner.go index e7934dd..946c1ca 100644 --- a/benchmarks/profile_runner.go +++ b/benchmarks/profile_runner.go @@ -53,41 +53,41 @@ func RunProfiledCommand(b *testing.B, config ProfileConfig, tool string, args .. // Build command path cmdPath := filepath.Join("..", tool) - + // Add profiling flags profileArgs := []string{} if config.EnableCPU || config.EnableMem { profileArgs = append(profileArgs, "-profile") profileArgs = append(profileArgs, "-profiledir", config.ProfileDir) } - + // Combine all arguments allArgs := append(profileArgs, args...) - + // Create command cmd := exec.Command(cmdPath, allArgs...) - + // Set up output capture - outputFile := filepath.Join(config.ProfileDir, fmt.Sprintf("%s_output_%s.log", + outputFile := filepath.Join(config.ProfileDir, fmt.Sprintf("%s_output_%s.log", tool, time.Now().Format("20060102_150405"))) output, err := os.Create(outputFile) if err != nil { return nil, fmt.Errorf("creating output file: %w", err) } defer output.Close() - + cmd.Stdout = output cmd.Stderr = output - + // Record start time start := time.Now() - + // Run command err = cmd.Run() - + // Record duration duration := time.Since(start) - + result := &ProfileResult{ Tool: tool, Operation: strings.Join(args, "_"), @@ -95,12 +95,12 @@ func RunProfiledCommand(b *testing.B, config ProfileConfig, tool string, args .. ExitCode: cmd.ProcessState.ExitCode(), Error: err, } - + // Find generated profile files timestamp := time.Now().Format("20060102_1504") - profiles, _ := filepath.Glob(filepath.Join(config.ProfileDir, + profiles, _ := filepath.Glob(filepath.Join(config.ProfileDir, fmt.Sprintf("%s_*_%s*.prof", tool, timestamp))) - + for _, profile := range profiles { if strings.Contains(profile, "_cpu_") { result.CPUProfile = profile @@ -110,14 +110,14 @@ func RunProfiledCommand(b *testing.B, config ProfileConfig, tool string, args .. result.AllocProfile = profile } } - + return result, nil } // ProfileBenchmark runs a benchmark with profiling enabled func ProfileBenchmark(b *testing.B, name string, tool string, args ...string) { config := DefaultProfileConfig() - + b.Run(name+"_profiled", func(b *testing.B) { // Generate test data if needed testFile := "" @@ -130,7 +130,7 @@ func ProfileBenchmark(b *testing.B, name string, tool string, args ...string) { } testFile = GenerateTestFile(b, testConfig) defer os.Remove(testFile) - + // Replace placeholder in args for i, arg := range args { if arg == "__TESTFILE__" { @@ -138,13 +138,13 @@ func ProfileBenchmark(b *testing.B, name string, tool string, args ...string) { } } } - + // Run profiled command result, err := RunProfiledCommand(b, config, tool, args...) if err != nil && result.ExitCode != 0 { b.Fatalf("Command failed: %v", err) } - + // Report results b.Logf("Profile run completed in %v", result.Duration) if result.CPUProfile != "" { @@ -156,7 +156,7 @@ func ProfileBenchmark(b *testing.B, name string, tool string, args ...string) { if result.AllocProfile != "" { b.Logf("Allocation profile: %s", result.AllocProfile) } - + // Analyze profiles using dtail-tools dtailToolsPath := filepath.Join("..", "dtail-tools") if _, err := os.Stat(dtailToolsPath); err == nil { @@ -173,14 +173,14 @@ func ProfileBenchmark(b *testing.B, name string, tool string, args ...string) { // analyzeProfileWithTools runs dtail-tools profile analyze on a profile file func analyzeProfileWithTools(b *testing.B, dtailToolsPath, profilePath, profileType string) { b.Logf("\n%s Profile Analysis:", profileType) - + cmd := exec.Command(dtailToolsPath, "profile", "-mode", "analyze", profilePath) output, err := cmd.CombinedOutput() if err != nil { b.Logf("Failed to analyze profile: %v", err) return } - + // Print analysis output lines := strings.Split(string(output), "\n") // Print first 10 lines of analysis @@ -191,22 +191,24 @@ func analyzeProfileWithTools(b *testing.B, dtailToolsPath, profilePath, profileT } } -// Profiling benchmarks for each tool +// BenchmarkDCatWithProfiling runs dcat with profiling enabled. func BenchmarkDCatWithProfiling(b *testing.B) { ProfileBenchmark(b, "Simple", "dcat", "--plain", "--cfg", "none", "__TESTFILE__") } +// BenchmarkDGrepWithProfiling runs dgrep with profiling enabled. func BenchmarkDGrepWithProfiling(b *testing.B) { - ProfileBenchmark(b, "Regex", "dgrep", "--plain", "--cfg", "none", + ProfileBenchmark(b, "Regex", "dgrep", "--plain", "--cfg", "none", "-regex", "error|warning", "__TESTFILE__") } +// BenchmarkDMapWithProfiling runs dmap with profiling enabled. func BenchmarkDMapWithProfiling(b *testing.B) { // First generate a CSV file for dmap csvFile := filepath.Join(os.TempDir(), "dmap_test.csv") generateCSVTestData(b, csvFile, 10000) defer os.Remove(csvFile) - + ProfileBenchmark(b, "Count", "dmap", "--plain", "--cfg", "none", "-query", fmt.Sprintf("select count(*) from %s", csvFile)) } @@ -218,17 +220,17 @@ func generateCSVTestData(b *testing.B, filename string, rows int) { b.Fatalf("Failed to create CSV file: %v", err) } defer f.Close() - + // Write header fmt.Fprintln(f, "timestamp,user,action,duration") - + // Write data for i := 0; i < rows; i++ { timestamp := time.Now().Add(time.Duration(i) * time.Second).Format("2006-01-02 15:04:05") user := fmt.Sprintf("user%d", i%100) action := []string{"login", "query", "logout"}[i%3] duration := 100 + i%500 - + fmt.Fprintf(f, "%s,%s,%s,%d\n", timestamp, user, action, duration) } -}
\ No newline at end of file +} diff --git a/benchmarks/testdata_generator.go b/benchmarks/testdata_generator.go index 8ee4e29..74487aa 100644 --- a/benchmarks/testdata_generator.go +++ b/benchmarks/testdata_generator.go @@ -18,10 +18,12 @@ import ( // FileSize represents the size category of test files type FileSize int +// Supported benchmark file sizes. const ( - Small FileSize = 10 * 1024 * 1024 // 10MB - Medium FileSize = 100 * 1024 * 1024 // 100MB - Large FileSize = 1024 * 1024 * 1024 // 1GB + // Small represents a 10MB test file. + Small FileSize = 10 * 1024 * 1024 // 10MB + Medium FileSize = 100 * 1024 * 1024 // 100MB + Large FileSize = 1024 * 1024 * 1024 // 1GB ) func (fs FileSize) String() string { @@ -40,7 +42,9 @@ func (fs FileSize) String() string { // LogFormat represents different log format types type LogFormat int +// Supported synthetic log formats. const ( + // SimpleLogFormat contains plain text log lines. SimpleLogFormat LogFormat = iota MapReduceLogFormat MixedLogFormat @@ -49,7 +53,9 @@ const ( // CompressionType represents file compression options type CompressionType int +// Supported compression modes for generated test files. const ( + // NoCompression stores test data without compression. NoCompression CompressionType = iota GzipCompression ZstdCompression @@ -163,30 +169,30 @@ func generateCompressedFile(tmpFile, finalFile string, config TestDataConfig, cr // writeLogLines generates log content based on config func writeLogLines(w io.Writer, config TestDataConfig) error { rng := rand.New(rand.NewSource(time.Now().UnixNano())) - + // Calculate approximate lines needed avgLineSize := 150 // bytes totalLines := int(config.Size) / avgLineSize - + // Pre-generate some template lines for variation templateLines := generateTemplateLines(config.Format, config.LineVariation, config.Pattern, config.PatternRate, rng) - + bytesWritten := 0 for i := 0; i < totalLines && bytesWritten < int(config.Size); i++ { // Pick a random template line line := templateLines[rng.Intn(len(templateLines))] - + // Write with current timestamp timestampedLine := strings.Replace(line, "{TIMESTAMP}", generateTimestamp(i), 1) timestampedLine = strings.Replace(timestampedLine, "{COUNTER}", fmt.Sprintf("%d", i), 1) - + n, err := fmt.Fprintln(w, timestampedLine) if err != nil { return err } bytesWritten += n } - + return nil } @@ -194,10 +200,10 @@ func writeLogLines(w io.Writer, config TestDataConfig) error { func generateTemplateLines(format LogFormat, variation int, pattern string, patternRate int, rng *rand.Rand) []string { numTemplates := max(10, variation) // At least 10 templates templates := make([]string, 0, numTemplates) - + for i := 0; i < numTemplates; i++ { includePattern := pattern != "" && rng.Intn(100) < patternRate - + switch format { case SimpleLogFormat: templates = append(templates, generateSimpleLogLine(i, includePattern, pattern, rng)) @@ -211,7 +217,7 @@ func generateTemplateLines(format LogFormat, variation int, pattern string, patt } } } - + return templates } @@ -219,14 +225,14 @@ func generateTemplateLines(format LogFormat, variation int, pattern string, patt func generateSimpleLogLine(id int, includePattern bool, pattern string, rng *rand.Rand) string { levels := []string{"INFO", "WARN", "ERROR", "DEBUG"} level := levels[rng.Intn(len(levels))] - + message := fmt.Sprintf("Processing request %d", id) if includePattern && pattern != "" { message = fmt.Sprintf("%s %s", message, pattern) } - + // Format: LEVEL|TIMESTAMP|THREAD|FILE:LINE|MESSAGE - return fmt.Sprintf("%s|{TIMESTAMP}|thread-%d|app.go:%d|%s", + return fmt.Sprintf("%s|{TIMESTAMP}|thread-%d|app.go:%d|%s", level, rng.Intn(10)+1, rng.Intn(1000)+1, message) } @@ -235,12 +241,12 @@ func generateMapReduceLogLine(id int, includePattern bool, pattern string, rng * goroutines := rng.Intn(50) + 10 connections := rng.Intn(100) lifetime := rng.Intn(1000) + 100 - + message := "MAPREDUCE:STATS" if includePattern && pattern != "" { message = fmt.Sprintf("%s|%s", message, pattern) } - + // Format matching the integration test data return fmt.Sprintf("INFO|{TIMESTAMP}|1|stats.go:56|8|%d|7|0.%02d|471h%dm%ds|%s|currentConnections=%d|lifetimeConnections=%d", goroutines, rng.Intn(100), rng.Intn(60), rng.Intn(60), message, connections, lifetime) @@ -260,19 +266,19 @@ func CleanupBenchmarkFiles(pattern string) error { if pattern == "" { pattern = "dtail_bench_*.tmp*" } - + tempDir := os.TempDir() matches, err := filepath.Glob(filepath.Join(tempDir, pattern)) if err != nil { return err } - + for _, match := range matches { if err := os.Remove(match); err != nil && !os.IsNotExist(err) { return err } } - + return nil } @@ -282,4 +288,4 @@ func max(a, b int) int { return a } return b -}
\ No newline at end of file +} diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go index 4203809..9b8da4f 100644 --- a/cmd/dcat/main.go +++ b/cmd/dcat/main.go @@ -1,18 +1,12 @@ package main import ( - "context" "flag" "os" - "sync" - - "net/http" - _ "net/http" - _ "net/http/pprof" + "github.com/mimecast/dtail/internal/cli" "github.com/mimecast/dtail/internal/clients" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/signal" "github.com/mimecast/dtail/internal/profiling" "github.com/mimecast/dtail/internal/source" @@ -61,44 +55,18 @@ func main() { version.PrintAndExit() } - ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - wg.Add(1) - dlog.Start(ctx, &wg, source.Client) - - // Set up profiling - profiler := profiling.NewProfiler(profileFlags.ToConfig("dcat")) - defer profiler.Stop() - - if pprof != "" { - dlog.Client.Info("Starting PProf", pprof) - go func() { - panic(http.ListenAndServe(pprof, nil)) - }() - } - - // Log initial metrics if profiling is enabled - if profileFlags.Enabled() { - profiler.LogMetrics("startup") - } + runtime := cli.NewClientRuntime(nil, profileFlags, "dcat") + runtime.StartPProf(pprof) + runtime.LogStartupMetrics() client, err := clients.NewCatClient(args) if err != nil { + runtime.Stop() panic(err) } - status := client.Start(ctx, signal.InterruptCh(ctx)) - - // Log final metrics if profiling is enabled - if profileFlags.Enabled() { - profiler.LogMetrics("shutdown") - } - - // Stop profiler before exit - profiler.Stop() - - cancel() - - wg.Wait() + status := client.Start(runtime.Context(), signal.InterruptCh(runtime.Context())) + runtime.LogShutdownMetrics() + runtime.Stop() os.Exit(status) } diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go index 0002c89..0e4eb29 100644 --- a/cmd/dgrep/main.go +++ b/cmd/dgrep/main.go @@ -1,18 +1,12 @@ package main import ( - "context" "flag" "os" - "sync" - - "net/http" - _ "net/http" - _ "net/http/pprof" + "github.com/mimecast/dtail/internal/cli" "github.com/mimecast/dtail/internal/clients" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/signal" "github.com/mimecast/dtail/internal/profiling" "github.com/mimecast/dtail/internal/source" @@ -67,48 +61,23 @@ func main() { version.PrintAndExit() } - ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - wg.Add(1) - dlog.Start(ctx, &wg, source.Client) - - // Set up profiling - profiler := profiling.NewProfiler(profileFlags.ToConfig("dgrep")) - defer profiler.Stop() + runtime := cli.NewClientRuntime(nil, profileFlags, "dgrep") if grep != "" { args.RegexStr = grep } - if pprof != "" { - dlog.Client.Info("Starting PProf", pprof) - go func() { - panic(http.ListenAndServe(pprof, nil)) - }() - } - - // Log initial metrics if profiling is enabled - if profileFlags.Enabled() { - profiler.LogMetrics("startup") - } + runtime.StartPProf(pprof) + runtime.LogStartupMetrics() client, err := clients.NewGrepClient(args) if err != nil { + runtime.Stop() panic(err) } - status := client.Start(ctx, signal.InterruptCh(ctx)) - - // Log final metrics if profiling is enabled - if profileFlags.Enabled() { - profiler.LogMetrics("shutdown") - } - - // Stop profiler before exit - profiler.Stop() - - cancel() - - wg.Wait() + status := client.Start(runtime.Context(), signal.InterruptCh(runtime.Context())) + runtime.LogShutdownMetrics() + runtime.Stop() os.Exit(status) } diff --git a/cmd/dmap/main.go b/cmd/dmap/main.go index 498a09e..ca3981f 100644 --- a/cmd/dmap/main.go +++ b/cmd/dmap/main.go @@ -1,15 +1,10 @@ package main import ( - "context" "flag" "os" - "sync" - - "net/http" - _ "net/http" - _ "net/http/pprof" + "github.com/mimecast/dtail/internal/cli" "github.com/mimecast/dtail/internal/clients" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" @@ -67,44 +62,21 @@ func main() { version.PrintAndExit() } - ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - wg.Add(1) - dlog.Start(ctx, &wg, source.Client) - - // Set up profiling - profiler := profiling.NewProfiler(profileFlags.ToConfig("dmap")) - defer profiler.Stop() - - if pprof != "" { - dlog.Client.Info("Starting PProf", pprof) - go func() { - panic(http.ListenAndServe(pprof, nil)) - }() - } - - // Log initial metrics if profiling is enabled - if profileFlags.Enabled() { - profiler.LogMetrics("startup") - } + runtime := cli.NewClientRuntime(nil, profileFlags, "dmap") + runtime.StartPProf(pprof) + runtime.LogStartupMetrics() client, err := clients.NewMaprClient(args, clients.DefaultMode) if err != nil { + runtime.Stop() dlog.Client.FatalPanic(err) } - status := client.Start(ctx, signal.InterruptChWithCancel(ctx, cancel)) - - // Log final metrics if profiling is enabled - if profileFlags.Enabled() { - profiler.LogMetrics("shutdown") - } - - // Stop profiler before exit - profiler.Stop() - - cancel() - - wg.Wait() + status := client.Start( + runtime.Context(), + signal.InterruptChWithCancel(runtime.Context(), runtime.Cancel), + ) + runtime.LogShutdownMetrics() + runtime.Stop() os.Exit(status) } diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go index 45c0275..ee34cc5 100644 --- a/cmd/dtail/main.go +++ b/cmd/dtail/main.go @@ -4,17 +4,13 @@ import ( "context" "flag" "fmt" - "net/http" - _ "net/http" - _ "net/http/pprof" "os" - "sync" "time" + "github.com/mimecast/dtail/internal/cli" "github.com/mimecast/dtail/internal/clients" "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/signal" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/profiling" @@ -91,39 +87,27 @@ func main() { } } - ctx, cancel := context.WithCancel(context.Background()) + baseCtx := context.Background() + var timeoutCancel context.CancelFunc if shutdownAfter > 0 { // NEXT: This does not work (auto shutdown) - ctx, cancel = context.WithTimeout(ctx, time.Duration(shutdownAfter)*time.Second) - defer cancel() + baseCtx, timeoutCancel = context.WithTimeout(baseCtx, time.Duration(shutdownAfter)*time.Second) } - var wg sync.WaitGroup - wg.Add(1) - dlog.Start(ctx, &wg, source.Client) - - // Set up profiling - profiler := profiling.NewProfiler(profileFlags.ToConfig("dtail")) - defer profiler.Stop() + runtime := cli.NewClientRuntime(baseCtx, profileFlags, "dtail") if checkHealth { fmt.Println("WARN: DTail health check has moved to separate binary dtailhealth" + " - please adjust the monitoring scripts!") - cancel() + runtime.Stop() + if timeoutCancel != nil { + timeoutCancel() + } os.Exit(1) } - if pprof != "" { - dlog.Client.Info("Starting PProf", pprof) - go func() { - panic(http.ListenAndServe(pprof, nil)) - }() - } - - // Log initial metrics if profiling is enabled - if profileFlags.Enabled() { - profiler.LogMetrics("startup") - } + runtime.StartPProf(pprof) + runtime.LogStartupMetrics() var client clients.Client var err error @@ -132,23 +116,24 @@ func main() { switch args.QueryStr { case "": if client, err = clients.NewTailClient(args); err != nil { + runtime.Stop() panic(err) } default: if client, err = clients.NewMaprClient(args, clients.DefaultMode); err != nil { + runtime.Stop() panic(err) } } - status := client.Start(ctx, signal.InterruptChWithCancel(ctx, cancel)) - - // Log final metrics if profiling is enabled - if profileFlags.Enabled() { - profiler.LogMetrics("shutdown") + status := client.Start( + runtime.Context(), + signal.InterruptChWithCancel(runtime.Context(), runtime.Cancel), + ) + runtime.LogShutdownMetrics() + runtime.Stop() + if timeoutCancel != nil { + timeoutCancel() } - - cancel() - - wg.Wait() os.Exit(status) } diff --git a/integrationtests/testhelpers.go b/integrationtests/testhelpers.go index 5368dfe..da0ecd8 100644 --- a/integrationtests/testhelpers.go +++ b/integrationtests/testhelpers.go @@ -52,27 +52,27 @@ func (tl *TestLogger) LogFileComparison(fileA, fileB, method string) { func (tl *TestLogger) WriteLogFile() error { tl.mu.Lock() defer tl.mu.Unlock() - + logFile := fmt.Sprintf("%s.log", tl.testName) f, err := os.Create(logFile) if err != nil { return fmt.Errorf("failed to create log file: %w", err) } defer f.Close() - + fmt.Fprintf(f, "Test: %s\n", tl.testName) fmt.Fprintf(f, "Timestamp: %s\n\n", time.Now().Format(time.RFC3339)) - + fmt.Fprintf(f, "=== EXTERNAL COMMANDS EXECUTED (in order) ===\n") for i, cmd := range tl.commandHistory { fmt.Fprintf(f, "%d. %s\n", i+1, cmd) } - + fmt.Fprintf(f, "\n=== FILE COMPARISONS ===\n") for _, comparison := range tl.fileComparisons { fmt.Fprintf(f, "%s\n", comparison) } - + return nil } @@ -100,7 +100,7 @@ func cleanupTmpFiles(t *testing.T) { t.Logf("Warning: failed to glob .tmp files: %v", err) return } - + for _, file := range matches { if err := os.Remove(file); err != nil { t.Logf("Warning: failed to remove %s: %v", file, err) @@ -137,7 +137,7 @@ func DefaultServerConfig() *ServerConfig { } // startTestServer starts a dserver with the given configuration -func startTestServer(t *testing.T, ctx context.Context, cfg *ServerConfig) error { +func startTestServer(ctx context.Context, t *testing.T, cfg *ServerConfig) error { t.Helper() if cfg == nil { cfg = DefaultServerConfig() @@ -232,7 +232,7 @@ func (ts *TestServer) Start(logLevel string) error { // Give the server a moment to release the port time.Sleep(100 * time.Millisecond) }) - return startTestServer(ts.t, ts.ctx, &ServerConfig{ + return startTestServer(ts.ctx, ts.t, &ServerConfig{ Port: ts.port, BindAddress: ts.bindAddress, LogLevel: logLevel, @@ -252,7 +252,7 @@ func (ts *TestServer) StartWithConfig(cfg *ServerConfig) error { } cfg.Port = ts.port cfg.BindAddress = ts.bindAddress - return startTestServer(ts.t, ts.ctx, cfg) + return startTestServer(ts.ctx, ts.t, cfg) } // Address returns the server address in host:port format @@ -288,7 +288,7 @@ func NewCommandArgs() *CommandArgs { // ToSlice converts CommandArgs to a string slice for command execution func (c *CommandArgs) ToSlice() []string { args := []string{"--cfg", c.Config} - + if c.LogLevel != "" { args = append(args, "--logLevel", c.LogLevel) } @@ -301,10 +301,10 @@ func (c *CommandArgs) ToSlice() []string { if c.NoColor { args = append(args, "--noColor") } - + // Add ExtraArgs before server/files args for commands like dgrep where order matters args = append(args, c.ExtraArgs...) - + if len(c.Servers) > 0 { args = append(args, "--servers", strings.Join(c.Servers, ",")) } @@ -314,7 +314,7 @@ func (c *CommandArgs) ToSlice() []string { if len(c.Files) > 0 { args = append(args, "--files", strings.Join(c.Files, ",")) } - + return args } @@ -341,7 +341,7 @@ func runDualModeTest(t *testing.T, test DualModeTest) { // verifyFileExists checks if a file exists and is not empty func verifyFileExists(t *testing.T, filename string) error { t.Helper() - + info, err := os.Stat(filename) if err != nil { return fmt.Errorf("file %s not created: %w", filename, err) @@ -349,7 +349,7 @@ func verifyFileExists(t *testing.T, filename string) error { if info.Size() == 0 { return fmt.Errorf("file %s is empty", filename) } - + return nil } @@ -386,7 +386,7 @@ func verifyColoredOutput(t *testing.T, outFile string, expectServerMetadata bool } // runCommandAndVerify runs a command and verifies the output against an expected file -func runCommandAndVerify(t *testing.T, ctx context.Context, outFile, expectedFile, cmd string, args ...string) error { +func runCommandAndVerify(ctx context.Context, t *testing.T, outFile, expectedFile, cmd string, args ...string) error { t.Helper() _, err := runCommand(ctx, t, outFile, cmd, args...) @@ -402,7 +402,7 @@ func runCommandAndVerify(t *testing.T, ctx context.Context, outFile, expectedFil } // runCommandAndVerifyContents runs a command and verifies the output contents (ignoring order) -func runCommandAndVerifyContents(t *testing.T, ctx context.Context, outFile, expectedFile, cmd string, args ...string) error { +func runCommandAndVerifyContents(ctx context.Context, t *testing.T, outFile, expectedFile, cmd string, args ...string) error { t.Helper() _, err := runCommand(ctx, t, outFile, cmd, args...) @@ -456,16 +456,16 @@ func GetStandardTestPaths() *StandardTestPaths { // verifyQueryFile checks if the query file contains the expected query content func verifyQueryFile(t *testing.T, queryFile, expectedQuery string) error { t.Helper() - + content, err := os.ReadFile(queryFile) if err != nil { return fmt.Errorf("failed to read query file: %w", err) } - + actualQuery := string(content) if actualQuery != expectedQuery { return fmt.Errorf("query mismatch:\nExpected: %s\nActual: %s", expectedQuery, actualQuery) } - + return nil } diff --git a/internal/cli/runtime.go b/internal/cli/runtime.go new file mode 100644 index 0000000..5e577c2 --- /dev/null +++ b/internal/cli/runtime.go @@ -0,0 +1,84 @@ +package cli + +import ( + "context" + "net/http" + _ "net/http/pprof" // Register pprof handlers when runtime pprof endpoint is enabled. + "sync" + + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/profiling" + "github.com/mimecast/dtail/internal/source" +) + +// ClientRuntime owns common client command runtime components. +type ClientRuntime struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + profiler *profiling.Profiler + profileEnabled bool +} + +// NewClientRuntime starts logging and profiling for a client command. +func NewClientRuntime(parent context.Context, profileFlags profiling.Flags, profileName string) *ClientRuntime { + if parent == nil { + parent = context.Background() + } + ctx, cancel := context.WithCancel(parent) + runtime := &ClientRuntime{ + ctx: ctx, + cancel: cancel, + profiler: profiling.NewProfiler(profileFlags.ToConfig(profileName)), + profileEnabled: profileFlags.Enabled(), + } + + runtime.wg.Add(1) + dlog.Start(ctx, &runtime.wg, source.Client) + return runtime +} + +// Context returns the runtime context. +func (r *ClientRuntime) Context() context.Context { + return r.ctx +} + +// Cancel cancels the runtime context. +func (r *ClientRuntime) Cancel() { + r.cancel() +} + +// StartPProf starts the pprof server if an address is provided. +func (r *ClientRuntime) StartPProf(address string) { + if address == "" { + return + } + + dlog.Client.Info("Starting PProf", address) + go func() { + if err := http.ListenAndServe(address, nil); err != nil { + dlog.Client.Error("PProf server exited", err) + } + }() +} + +// LogStartupMetrics logs startup profiling metrics when enabled. +func (r *ClientRuntime) LogStartupMetrics() { + if r.profileEnabled { + r.profiler.LogMetrics("startup") + } +} + +// LogShutdownMetrics logs shutdown profiling metrics when enabled. +func (r *ClientRuntime) LogShutdownMetrics() { + if r.profileEnabled { + r.profiler.LogMetrics("shutdown") + } +} + +// Stop stops profiling and logging runtime goroutines. +func (r *ClientRuntime) Stop() { + r.profiler.Stop() + r.cancel() + r.wg.Wait() +} diff --git a/internal/config/initializer.go b/internal/config/initializer.go index b540457..6038705 100644 --- a/internal/config/initializer.go +++ b/internal/config/initializer.go @@ -29,13 +29,18 @@ func (in *initializer) parseConfig(args *Args) error { return in.parseSpecificConfig(args.ConfigFile) } - if homeDir, err := os.UserHomeDir(); err != nil { - var paths []string - paths = append(paths, fmt.Sprintf("%s/.config/dtail/dtail.conf", homeDir)) - paths = append(paths, fmt.Sprintf("%s/.dtail.conf", homeDir)) + homeDir, err := os.UserHomeDir() + if err == nil && homeDir != "" { + paths := []string{ + fmt.Sprintf("%s/.config/dtail/dtail.conf", homeDir), + fmt.Sprintf("%s/.dtail.conf", homeDir), + } for _, configPath := range paths { - if _, err := os.Stat(configPath); os.IsNotExist(err) { - continue + if _, err := os.Stat(configPath); err != nil { + if os.IsNotExist(err) { + continue + } + return err } if err := in.parseSpecificConfig(configPath); err != nil { return err diff --git a/internal/config/initializer_test.go b/internal/config/initializer_test.go new file mode 100644 index 0000000..ea6b229 --- /dev/null +++ b/internal/config/initializer_test.go @@ -0,0 +1,68 @@ +package config + +import ( + "os" + "path/filepath" + "testing" +) + +func TestParseConfigLoadsDefaultXDGConfig(t *testing.T) { + home := t.TempDir() + t.Setenv("HOME", home) + + configPath := filepath.Join(home, ".config", "dtail", "dtail.conf") + if err := os.MkdirAll(filepath.Dir(configPath), 0o755); err != nil { + t.Fatalf("mkdir failed: %v", err) + } + writeTestConfig(t, configPath, `{"Common":{"LogLevel":"debug"}}`) + + in := initializer{ + Common: newDefaultCommonConfig(), + Server: newDefaultServerConfig(), + Client: newDefaultClientConfig(), + } + + if err := in.parseConfig(&Args{}); err != nil { + t.Fatalf("parseConfig failed: %v", err) + } + if in.Common.LogLevel != "debug" { + t.Fatalf("expected log level debug, got %q", in.Common.LogLevel) + } +} + +func TestParseConfigLoadsDefaultConfigsInOrder(t *testing.T) { + home := t.TempDir() + t.Setenv("HOME", home) + + xdgPath := filepath.Join(home, ".config", "dtail", "dtail.conf") + if err := os.MkdirAll(filepath.Dir(xdgPath), 0o755); err != nil { + t.Fatalf("mkdir failed: %v", err) + } + writeTestConfig(t, xdgPath, `{"Common":{"Logger":"file","LogLevel":"warn"}}`) + + homePath := filepath.Join(home, ".dtail.conf") + writeTestConfig(t, homePath, `{"Common":{"LogLevel":"error"}}`) + + in := initializer{ + Common: newDefaultCommonConfig(), + Server: newDefaultServerConfig(), + Client: newDefaultClientConfig(), + } + + if err := in.parseConfig(&Args{}); err != nil { + t.Fatalf("parseConfig failed: %v", err) + } + if in.Common.LogLevel != "error" { + t.Fatalf("expected final log level error, got %q", in.Common.LogLevel) + } + if in.Common.Logger != "file" { + t.Fatalf("expected logger file from first config, got %q", in.Common.Logger) + } +} + +func writeTestConfig(t *testing.T, path, body string) { + t.Helper() + if err := os.WriteFile(path, []byte(body), 0o644); err != nil { + t.Fatalf("write config failed: %v", err) + } +} diff --git a/internal/mapr/logformat/custom1.go b/internal/mapr/logformat/custom1.go index 05e0867..e340dbf 100644 --- a/internal/mapr/logformat/custom1.go +++ b/internal/mapr/logformat/custom1.go @@ -2,6 +2,7 @@ package logformat import "errors" +// ErrCustom1NotImplemented indicates custom1 parser is only a template. var ErrCustom1NotImplemented error = errors.New("custom1 log format is not implemented") // Template for creating a custom log format. diff --git a/internal/mapr/logformat/custom2.go b/internal/mapr/logformat/custom2.go index cc8d5b9..4fddfcd 100644 --- a/internal/mapr/logformat/custom2.go +++ b/internal/mapr/logformat/custom2.go @@ -2,6 +2,7 @@ package logformat import "errors" +// ErrCustom2NotImplemented indicates custom2 parser is only a template. var ErrCustom2NotImplemented error = errors.New("custom2 log format is not implemented") // Template for creating a custom log format. diff --git a/internal/server/handlers/authkeycommand_test.go b/internal/server/handlers/authkeycommand_test.go index bb9488b..f510038 100644 --- a/internal/server/handlers/authkeycommand_test.go +++ b/internal/server/handlers/authkeycommand_test.go @@ -33,11 +33,11 @@ func TestHandleAuthKeyCommandSuccess(t *testing.T) { if message := readServerMessage(t, handler.serverMessages); message != "AUTHKEY OK\n" { t.Fatalf("Unexpected response: %q", message) } - if !sshserver.ServerAuthKeyStore().Has(handler.user.Name, key) { + if !sshserver.AuthKeys().Has(handler.user.Name, key) { t.Fatalf("Expected key to be stored for user") } - sshserver.ServerAuthKeyStore().Remove(handler.user.Name, key) + sshserver.AuthKeys().Remove(handler.user.Name, key) } func TestHandleAuthKeyCommandFeatureDisabled(t *testing.T) { @@ -51,7 +51,7 @@ func TestHandleAuthKeyCommandFeatureDisabled(t *testing.T) { if message := readServerMessage(t, handler.serverMessages); message != "AUTHKEY ERR feature disabled\n" { t.Fatalf("Unexpected response: %q", message) } - if sshserver.ServerAuthKeyStore().Has(handler.user.Name, key) { + if sshserver.AuthKeys().Has(handler.user.Name, key) { t.Fatalf("Expected no key to be stored while feature is disabled") } } diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index f21262e..d510139 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -128,13 +128,9 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { pool.RecycleBytesBuffer(line.Content) line.Recycle() - case <-time.After(time.Second): - select { - case <-h.done.Done(): - err = io.EOF - return - default: - } + case <-h.done.Done(): + err = io.EOF + return } return } diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go index 650dcf2..6d7a095 100644 --- a/internal/server/handlers/readcommand_server.go +++ b/internal/server/handlers/readcommand_server.go @@ -8,25 +8,40 @@ import ( "github.com/mimecast/dtail/internal/mapr/server" ) -type readCommandServer interface { +type readCommandContext interface { LogContext() interface{} - SendServerMessage(message string) +} + +type readCommandFiles interface { CanReadFile(path string) bool - ServerMessagesChannel() chan string CatLimiter() chan struct{} TailLimiter() chan struct{} +} + +type readCommandMessages interface { + SendServerMessage(message string) + ServerMessagesChannel() chan string Hostname() string PlainOutput() bool Serverless() bool - TurboBoostDisabled() bool - HasRegularAggregate() bool RegisterAggregateLines(lines chan *line.Line) SharedLinesChannel() chan *line.Line +} + +type readCommandAggregates interface { + HasRegularAggregate() bool TurboAggregate() *server.TurboAggregate +} + +type readCommandLifecycle interface { AddPendingFiles(delta int32) int32 CompletePendingFile() (remaining int32, activeCommands int32) PendingAndActive() (pending int32, activeCommands int32) TriggerShutdown() +} + +type readCommandTurbo interface { + TurboBoostDisabled() bool IsTurboMode() bool EnableTurboMode() HasTurboEOF() bool @@ -35,6 +50,9 @@ type readCommandServer interface { GetTurboChannel() chan []byte TurboChannelLen() int WaitForTurboEOFAck(timeout time.Duration) bool +} + +type readCommandTiming interface { ReadGlobRetryInterval() time.Duration ReadRetryInterval() time.Duration AggregateLinesChannelBufferSize() int @@ -45,90 +63,120 @@ type readCommandServer interface { TurboEOFAckTimeout() time.Duration } +type readCommandServer interface { + readCommandContext + readCommandFiles + readCommandMessages + readCommandAggregates + readCommandLifecycle + readCommandTurbo + readCommandTiming +} + var _ readCommandServer = (*ServerHandler)(nil) +// LogContext returns the logger context associated with the current user/session. func (h *ServerHandler) LogContext() interface{} { return h.user } +// SendServerMessage sends a formatted server message to the client. func (h *ServerHandler) SendServerMessage(message string) { h.sendln(h.serverMessages, message) } +// CanReadFile reports whether the current user can read the given path. func (h *ServerHandler) CanReadFile(path string) bool { return h.user.HasFilePermission(path, "readfiles") } +// ServerMessagesChannel returns the server message channel. func (h *ServerHandler) ServerMessagesChannel() chan string { return h.serverMessages } +// CatLimiter returns the concurrency limiter for cat/grep style reads. func (h *ServerHandler) CatLimiter() chan struct{} { return h.catLimiter } +// TailLimiter returns the concurrency limiter for tail reads. func (h *ServerHandler) TailLimiter() chan struct{} { return h.tailLimiter } +// Hostname returns the short hostname used for response formatting. func (h *ServerHandler) Hostname() string { return h.hostname } +// PlainOutput reports whether plain output mode is enabled. func (h *ServerHandler) PlainOutput() bool { return h.plain } +// Serverless reports whether the current session is running in serverless mode. func (h *ServerHandler) Serverless() bool { return h.serverless } +// TurboBoostDisabled reports whether turbo mode is disabled by configuration. func (h *ServerHandler) TurboBoostDisabled() bool { return h.serverCfg.TurboBoostDisable } +// HasRegularAggregate reports whether the regular map-reduce aggregate is active. func (h *ServerHandler) HasRegularAggregate() bool { return h.aggregate != nil } +// RegisterAggregateLines attaches a file line channel to the active aggregate. func (h *ServerHandler) RegisterAggregateLines(lines chan *line.Line) { if h.aggregate != nil { h.aggregate.NextLinesCh <- lines } } +// SharedLinesChannel returns the shared outbound line channel. func (h *ServerHandler) SharedLinesChannel() chan *line.Line { return h.lines } +// TurboAggregate returns the turbo aggregate if enabled for the session. func (h *ServerHandler) TurboAggregate() *server.TurboAggregate { return h.turboAggregate } +// AddPendingFiles increments or decrements the pending file counter. func (h *ServerHandler) AddPendingFiles(delta int32) int32 { return atomic.AddInt32(&h.pendingFiles, delta) } +// CompletePendingFile marks one file as completed and returns pending/active counters. func (h *ServerHandler) CompletePendingFile() (remaining int32, activeCommands int32) { remaining = atomic.AddInt32(&h.pendingFiles, -1) activeCommands = atomic.LoadInt32(&h.activeCommands) return remaining, activeCommands } +// PendingAndActive returns the current pending file and active command counts. func (h *ServerHandler) PendingAndActive() (pending int32, activeCommands int32) { pending = atomic.LoadInt32(&h.pendingFiles) activeCommands = atomic.LoadInt32(&h.activeCommands) return pending, activeCommands } +// TriggerShutdown starts the handler shutdown sequence. func (h *ServerHandler) TriggerShutdown() { h.shutdown() } +// FlushTurboData drains pending turbo data to the underlying writer. func (h *ServerHandler) FlushTurboData() { h.flushTurboData() } +// TurboEOFAckTimeout returns the timeout used while waiting for turbo EOF ACK. func (h *ServerHandler) TurboEOFAckTimeout() time.Duration { return durationFromMilliseconds(h.serverCfg.TurboEOFAckTimeoutMs, 2*time.Second) } @@ -147,22 +195,27 @@ func positiveIntOrDefault(value int, fallback int) int { return value } +// ReadGlobRetryInterval returns the retry interval for glob expansion failures. func (h *ServerHandler) ReadGlobRetryInterval() time.Duration { return durationFromMilliseconds(h.serverCfg.ReadGlobRetryIntervalMs, 5*time.Second) } +// ReadRetryInterval returns the retry interval for repeated file reads. func (h *ServerHandler) ReadRetryInterval() time.Duration { return durationFromMilliseconds(h.serverCfg.ReadRetryIntervalMs, 2*time.Second) } +// AggregateLinesChannelBufferSize returns the aggregate lines channel buffer size. func (h *ServerHandler) AggregateLinesChannelBufferSize() int { return positiveIntOrDefault(h.serverCfg.ReadAggregateLineBufferSize, 10000) } +// TurboDataTransmissionDelay returns the delay used after turbo flushes. func (h *ServerHandler) TurboDataTransmissionDelay() time.Duration { return durationFromMilliseconds(h.serverCfg.TurboTransmissionDelayMs, 50*time.Millisecond) } +// TurboEOFWaitDuration returns the wait duration used before signaling turbo EOF. func (h *ServerHandler) TurboEOFWaitDuration(fileCount int) time.Duration { baseWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitBaseMs, 500*time.Millisecond) if fileCount <= 10 { @@ -178,10 +231,12 @@ func (h *ServerHandler) TurboEOFWaitDuration(fileCount int) time.Duration { return wait } +// ShutdownTurboSerializeWait returns the wait before final turbo shutdown checks. func (h *ServerHandler) ShutdownTurboSerializeWait() time.Duration { return durationFromMilliseconds(h.serverCfg.ShutdownTurboSerializeWaitMs, 500*time.Millisecond) } +// ShutdownIdleRecheckWait returns the wait used for the final idle recheck. func (h *ServerHandler) ShutdownIdleRecheckWait() time.Duration { return durationFromMilliseconds(h.serverCfg.ShutdownIdleRecheckWaitMs, 10*time.Millisecond) } diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 5d5a78c..078fd27 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -180,6 +180,6 @@ func (h *ServerHandler) handleAuthKeyCommand(_ context.Context, _ lcontext.LCont return } - sshserver.ServerAuthKeyStore().Add(h.user.Name, pubKey) + sshserver.AuthKeys().Add(h.user.Name, pubKey) h.sendln(h.serverMessages, "AUTHKEY OK") } diff --git a/internal/ssh/client/knownhostscallback.go b/internal/ssh/client/knownhostscallback.go index 9c73864..26ab245 100644 --- a/internal/ssh/client/knownhostscallback.go +++ b/internal/ssh/client/knownhostscallback.go @@ -243,8 +243,7 @@ func (c *KnownHostsCallback) trustHosts(hosts []unknownHost) { } // Read old known hosts file, to see which are old and new entries - os.OpenFile(c.knownHostsPath, os.O_RDONLY|os.O_CREATE, 0666) - oldFd, err := os.Open(c.knownHostsPath) + oldFd, err := os.OpenFile(c.knownHostsPath, os.O_RDONLY|os.O_CREATE, 0600) if err != nil { panic(err) } @@ -257,9 +256,14 @@ func (c *KnownHostsCallback) trustHosts(hosts []unknownHost) { address := strings.SplitN(line, " ", 2)[0] if _, ok := addresses[address]; !ok { - newFd.WriteString(fmt.Sprintf("%s\n", line)) + if _, err := newFd.WriteString(fmt.Sprintf("%s\n", line)); err != nil { + panic(err) + } } } + if err := scanner.Err(); err != nil { + panic(err) + } // Now, replace old known hosts file if err := os.Rename(tmpKnownHostsPath, c.knownHostsPath); err != nil { diff --git a/internal/ssh/server/authkeystore.go b/internal/ssh/server/authkeystore.go index c4b89fe..c96b207 100644 --- a/internal/ssh/server/authkeystore.go +++ b/internal/ssh/server/authkeystore.go @@ -28,8 +28,8 @@ type AuthKeyStore struct { now func() time.Time } -// ServerAuthKeyStore returns the process-wide auth key cache used by the SSH server. -func ServerAuthKeyStore() *AuthKeyStore { +// AuthKeys returns the process-wide auth key cache used by the SSH server. +func AuthKeys() *AuthKeyStore { return authKeyStore } diff --git a/internal/tools/common/data_generator.go b/internal/tools/common/data_generator.go index 9446d8a..d3d4225 100644 --- a/internal/tools/common/data_generator.go +++ b/internal/tools/common/data_generator.go @@ -12,11 +12,13 @@ import ( // DataFormat represents the format of generated data type DataFormat string +// Supported data generator output formats. const ( - FormatLog DataFormat = "log" - FormatCSV DataFormat = "csv" - FormatDTail DataFormat = "dtail" - FormatMapReduce DataFormat = "mapreduce" + // FormatLog generates generic log lines. + FormatLog DataFormat = "log" + FormatCSV DataFormat = "csv" + FormatDTail DataFormat = "dtail" + FormatMapReduce DataFormat = "mapreduce" ) // DataGenerator generates test data for profiling and benchmarking @@ -112,7 +114,7 @@ func (g *DataGenerator) generateLogFile(filename string, targetSize int64) error line := fmt.Sprintf("[%s] %s - User %s performed %s action (duration: %dms, status: %s)\n", timestamp, level, user, action, duration, status) - + n, err := writer.WriteString(line) if err != nil { return err @@ -157,7 +159,7 @@ func (g *DataGenerator) generateCSVFile(filename string, targetSize int64) error } line := fmt.Sprintf("%s,%s,%s,%d,%s\n", timestamp, user, action, duration, status) - + n, err := writer.WriteString(line) if err != nil { return err @@ -180,14 +182,14 @@ func (g *DataGenerator) generateDTailFormatFile(filename string, targetSize int6 var currentSize int64 lineNum := 0 - hostnames := []string{"server01", "server02", "server03", "server04", "server05", + hostnames := []string{"server01", "server02", "server03", "server04", "server05", "server06", "server07", "server08", "server09", "server10"} for currentSize < targetSize { lineNum++ hostname := hostnames[lineNum%len(hostnames)] - timestamp := fmt.Sprintf("%02d%02d-%02d%02d%02d", - 10+(lineNum/86400)%12, (lineNum/3600)%30+1, + timestamp := fmt.Sprintf("%02d%02d-%02d%02d%02d", + 10+(lineNum/86400)%12, (lineNum/3600)%30+1, (lineNum/3600)%24, (lineNum/60)%60, lineNum%60) goroutines := 10 + (lineNum % 50) cgocalls := lineNum % 100 @@ -199,7 +201,7 @@ func (g *DataGenerator) generateDTailFormatFile(filename string, targetSize int6 line := fmt.Sprintf("INFO|%s|1|stats.go:56|%d|%d|%d|%.2f|%s|MAPREDUCE:STATS|hostname=%s|currentConnections=%d|lifetimeConnections=%d\n", timestamp, cpus, goroutines, cgocalls, loadavg, uptime, hostname, currentConnections, lifetimeConnections) - + n, err := writer.WriteString(line) if err != nil { return err @@ -220,13 +222,13 @@ func (g *DataGenerator) generateDTailFormatFileWithLines(filename string, lines writer := bufio.NewWriter(file) defer writer.Flush() - hostnames := []string{"server01", "server02", "server03", "server04", "server05", + hostnames := []string{"server01", "server02", "server03", "server04", "server05", "server06", "server07", "server08", "server09", "server10"} for i := 1; i <= lines; i++ { hostname := hostnames[i%len(hostnames)] - timestamp := fmt.Sprintf("%02d%02d-%02d%02d%02d", - 10+(i/86400)%12, (i/3600)%30+1, + timestamp := fmt.Sprintf("%02d%02d-%02d%02d%02d", + 10+(i/86400)%12, (i/3600)%30+1, (i/3600)%24, (i/60)%60, i%60) goroutines := 10 + (i % 50) cgocalls := i % 100 @@ -238,7 +240,7 @@ func (g *DataGenerator) generateDTailFormatFileWithLines(filename string, lines line := fmt.Sprintf("INFO|%s|1|stats.go:56|%d|%d|%d|%.2f|%s|MAPREDUCE:STATS|hostname=%s|currentConnections=%d|lifetimeConnections=%d\n", timestamp, cpus, goroutines, cgocalls, loadavg, uptime, hostname, currentConnections, lifetimeConnections) - + if _, err := writer.WriteString(line); err != nil { return err } @@ -263,4 +265,4 @@ func GenerateCSVFile(filename string, lines int) error { // Estimate size based on average line length (about 50 bytes per line) estimatedSize := int64(lines * 50) return g.generateCSVFile(filename, estimatedSize) -}
\ No newline at end of file +} diff --git a/internal/tools/profile/analyze.go b/internal/tools/profile/analyze.go index f27841a..59503b2 100644 --- a/internal/tools/profile/analyze.go +++ b/internal/tools/profile/analyze.go @@ -13,13 +13,13 @@ import ( "github.com/mimecast/dtail/internal/tools/common" ) -// ProfileInfo holds information about a profile file -type ProfileInfo struct { - Path string - Tool string - Type string // cpu, mem, alloc - ModTime string - Size int64 +// Info holds information about a profile file. +type Info struct { + Path string + Tool string + Type string // cpu, mem, alloc + ModTime string + Size int64 } func runAnalyze(cfg *Config) error { @@ -58,7 +58,7 @@ func listProfiles(cfg *Config) error { } // Group by tool - byTool := make(map[string][]ProfileInfo) + byTool := make(map[string][]Info) for _, p := range profiles { byTool[p.Tool] = append(byTool[p.Tool], p) } @@ -74,26 +74,26 @@ func listProfiles(cfg *Config) error { for _, tool := range tools { fmt.Printf("\n%s profiles:\n", tool) toolProfiles := byTool[tool] - + // Sort by modification time (newest first) sort.Slice(toolProfiles, func(i, j int) bool { return toolProfiles[i].ModTime > toolProfiles[j].ModTime }) for _, p := range toolProfiles { - fmt.Printf(" %-8s %s %8s %s\n", + fmt.Printf(" %-8s %s %8s %s\n", p.Type, p.ModTime, common.FormatSize(p.Size), filepath.Base(p.Path)) } } fmt.Printf("\nTotal: %d profiles\n", len(profiles)) fmt.Printf("\nUsage: dtail-tools profile -mode analyze <profile_file>\n") - + return nil } -func findProfiles(dir string) ([]ProfileInfo, error) { - var profiles []ProfileInfo +func findProfiles(dir string) ([]Info, error) { + var profiles []Info pattern := filepath.Join(dir, "*.prof") matches, err := filepath.Glob(pattern) @@ -117,7 +117,7 @@ func findProfiles(dir string) ([]ProfileInfo, error) { tool := parts[0] profType := parts[1] - profiles = append(profiles, ProfileInfo{ + profiles = append(profiles, Info{ Path: path, Tool: tool, Type: profType, @@ -158,11 +158,11 @@ func analyzeProfile(profilePath string, args ...string) error { func showTopFunctions(profilePath string, count int, isMemProfile bool) error { args := []string{"tool", "pprof", "-top", fmt.Sprintf("-nodecount=%d", count)} - + if isMemProfile { args = append(args, "-alloc_space") } - + args = append(args, profilePath) cmd := exec.Command("go", args...) @@ -178,22 +178,22 @@ func showTopFunctions(profilePath string, count int, isMemProfile bool) error { fmt.Printf("Top %d functions (sorted by flat):\n", count) fmt.Println("================================================================") - + for scanner.Scan() { line := scanner.Text() - + // Skip header lines - if strings.HasPrefix(line, "File:") || strings.HasPrefix(line, "Type:") || - strings.HasPrefix(line, "Time:") || strings.HasPrefix(line, "Duration:") { + if strings.HasPrefix(line, "File:") || strings.HasPrefix(line, "Type:") || + strings.HasPrefix(line, "Time:") || strings.HasPrefix(line, "Duration:") { continue } - + // Start printing from the table header if strings.Contains(line, "flat") && strings.Contains(line, "cum") { inTop = true fmt.Println("# Command: go " + strings.Join(args[1:], " ")) } - + if inTop { fmt.Println(line) if line != "" { @@ -216,6 +216,6 @@ func openWebProfile(profilePath string) error { cmd := exec.Command("go", "tool", "pprof", "-http=:8080", profilePath) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - + return cmd.Run() -}
\ No newline at end of file +} |
