summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-05 08:50:33 +0200
committerPaul Buetow <paul@buetow.org>2026-03-05 08:50:33 +0200
commit5d1b9f1062d38c301c0995ec6da980bdf5e48332 (patch)
tree81e1a8963ea66cf06164e89beb6cd2da0ee325f7
parentbb46cfbccea301721fb93485ea7169f5841feda3 (diff)
Improve lint/vet reliability and refactor client runtime/bootstrap
-rw-r--r--Makefile27
-rw-r--r--benchmarks/profile_runner.go58
-rw-r--r--benchmarks/testdata_generator.go48
-rw-r--r--cmd/dcat/main.go48
-rw-r--r--cmd/dgrep/main.go47
-rw-r--r--cmd/dmap/main.go50
-rw-r--r--cmd/dtail/main.go57
-rw-r--r--integrationtests/testhelpers.go40
-rw-r--r--internal/cli/runtime.go84
-rw-r--r--internal/config/initializer.go17
-rw-r--r--internal/config/initializer_test.go68
-rw-r--r--internal/mapr/logformat/custom1.go1
-rw-r--r--internal/mapr/logformat/custom2.go1
-rw-r--r--internal/server/handlers/authkeycommand_test.go6
-rw-r--r--internal/server/handlers/basehandler.go10
-rw-r--r--internal/server/handlers/readcommand_server.go65
-rw-r--r--internal/server/handlers/serverhandler.go2
-rw-r--r--internal/ssh/client/knownhostscallback.go10
-rw-r--r--internal/ssh/server/authkeystore.go4
-rw-r--r--internal/tools/common/data_generator.go32
-rw-r--r--internal/tools/profile/analyze.go48
21 files changed, 426 insertions, 297 deletions
diff --git a/Makefile b/Makefile
index 07fe185..659c9d1 100644
--- a/Makefile
+++ b/Makefile
@@ -38,18 +38,29 @@ clean:
@echo "Removing .prof files..."
find . -name "*.prof" -type f -delete
vet:
- find . -type d | grep -E -v '(./examples|./log|./doc)' | while read dir; do \
- echo ${GO} vet $$dir; \
- ${GO} vet $$dir; \
+ @set -e; \
+ packages=`${GO} list ./... | grep -v '^github.com/mimecast/dtail/benchmarks/cmd$$'`; \
+ for pkg in $$packages; do \
+ echo ${GO} vet $$pkg; \
+ ${GO} vet $$pkg; \
done
sh -c 'grep -R NEXT: .'
sh -c 'grep -R TODO: .'
lint:
- ${GO} get golang.org/x/lint/golint
- find . -type d | while read dir; do \
- echo golint $$dir; \
- golint $$dir; \
- done | grep -F .go:
+ @set -e; \
+ ${GO} install golang.org/x/lint/golint@v0.0.0-20241112194109-818c5a804067; \
+ gobin=`${GO} env GOBIN`; \
+ if [ -z "$$gobin" ]; then \
+ gobin=`${GO} env GOPATH`/bin; \
+ fi; \
+ golint_bin=$$gobin/golint; \
+ packages=`${GO} list ./... | grep -v '^github.com/mimecast/dtail/benchmarks/cmd$$'`; \
+ echo "Using $$golint_bin"; \
+ output=`$$golint_bin $$packages || true`; \
+ if [ -n "$$output" ]; then \
+ echo "$$output"; \
+ exit 1; \
+ fi
test:
${GO} clean -testcache
set -e; find . -name '*_test.go' | while read file; do dirname $$file; done | \
diff --git a/benchmarks/profile_runner.go b/benchmarks/profile_runner.go
index e7934dd..946c1ca 100644
--- a/benchmarks/profile_runner.go
+++ b/benchmarks/profile_runner.go
@@ -53,41 +53,41 @@ func RunProfiledCommand(b *testing.B, config ProfileConfig, tool string, args ..
// Build command path
cmdPath := filepath.Join("..", tool)
-
+
// Add profiling flags
profileArgs := []string{}
if config.EnableCPU || config.EnableMem {
profileArgs = append(profileArgs, "-profile")
profileArgs = append(profileArgs, "-profiledir", config.ProfileDir)
}
-
+
// Combine all arguments
allArgs := append(profileArgs, args...)
-
+
// Create command
cmd := exec.Command(cmdPath, allArgs...)
-
+
// Set up output capture
- outputFile := filepath.Join(config.ProfileDir, fmt.Sprintf("%s_output_%s.log",
+ outputFile := filepath.Join(config.ProfileDir, fmt.Sprintf("%s_output_%s.log",
tool, time.Now().Format("20060102_150405")))
output, err := os.Create(outputFile)
if err != nil {
return nil, fmt.Errorf("creating output file: %w", err)
}
defer output.Close()
-
+
cmd.Stdout = output
cmd.Stderr = output
-
+
// Record start time
start := time.Now()
-
+
// Run command
err = cmd.Run()
-
+
// Record duration
duration := time.Since(start)
-
+
result := &ProfileResult{
Tool: tool,
Operation: strings.Join(args, "_"),
@@ -95,12 +95,12 @@ func RunProfiledCommand(b *testing.B, config ProfileConfig, tool string, args ..
ExitCode: cmd.ProcessState.ExitCode(),
Error: err,
}
-
+
// Find generated profile files
timestamp := time.Now().Format("20060102_1504")
- profiles, _ := filepath.Glob(filepath.Join(config.ProfileDir,
+ profiles, _ := filepath.Glob(filepath.Join(config.ProfileDir,
fmt.Sprintf("%s_*_%s*.prof", tool, timestamp)))
-
+
for _, profile := range profiles {
if strings.Contains(profile, "_cpu_") {
result.CPUProfile = profile
@@ -110,14 +110,14 @@ func RunProfiledCommand(b *testing.B, config ProfileConfig, tool string, args ..
result.AllocProfile = profile
}
}
-
+
return result, nil
}
// ProfileBenchmark runs a benchmark with profiling enabled
func ProfileBenchmark(b *testing.B, name string, tool string, args ...string) {
config := DefaultProfileConfig()
-
+
b.Run(name+"_profiled", func(b *testing.B) {
// Generate test data if needed
testFile := ""
@@ -130,7 +130,7 @@ func ProfileBenchmark(b *testing.B, name string, tool string, args ...string) {
}
testFile = GenerateTestFile(b, testConfig)
defer os.Remove(testFile)
-
+
// Replace placeholder in args
for i, arg := range args {
if arg == "__TESTFILE__" {
@@ -138,13 +138,13 @@ func ProfileBenchmark(b *testing.B, name string, tool string, args ...string) {
}
}
}
-
+
// Run profiled command
result, err := RunProfiledCommand(b, config, tool, args...)
if err != nil && result.ExitCode != 0 {
b.Fatalf("Command failed: %v", err)
}
-
+
// Report results
b.Logf("Profile run completed in %v", result.Duration)
if result.CPUProfile != "" {
@@ -156,7 +156,7 @@ func ProfileBenchmark(b *testing.B, name string, tool string, args ...string) {
if result.AllocProfile != "" {
b.Logf("Allocation profile: %s", result.AllocProfile)
}
-
+
// Analyze profiles using dtail-tools
dtailToolsPath := filepath.Join("..", "dtail-tools")
if _, err := os.Stat(dtailToolsPath); err == nil {
@@ -173,14 +173,14 @@ func ProfileBenchmark(b *testing.B, name string, tool string, args ...string) {
// analyzeProfileWithTools runs dtail-tools profile analyze on a profile file
func analyzeProfileWithTools(b *testing.B, dtailToolsPath, profilePath, profileType string) {
b.Logf("\n%s Profile Analysis:", profileType)
-
+
cmd := exec.Command(dtailToolsPath, "profile", "-mode", "analyze", profilePath)
output, err := cmd.CombinedOutput()
if err != nil {
b.Logf("Failed to analyze profile: %v", err)
return
}
-
+
// Print analysis output
lines := strings.Split(string(output), "\n")
// Print first 10 lines of analysis
@@ -191,22 +191,24 @@ func analyzeProfileWithTools(b *testing.B, dtailToolsPath, profilePath, profileT
}
}
-// Profiling benchmarks for each tool
+// BenchmarkDCatWithProfiling runs dcat with profiling enabled.
func BenchmarkDCatWithProfiling(b *testing.B) {
ProfileBenchmark(b, "Simple", "dcat", "--plain", "--cfg", "none", "__TESTFILE__")
}
+// BenchmarkDGrepWithProfiling runs dgrep with profiling enabled.
func BenchmarkDGrepWithProfiling(b *testing.B) {
- ProfileBenchmark(b, "Regex", "dgrep", "--plain", "--cfg", "none",
+ ProfileBenchmark(b, "Regex", "dgrep", "--plain", "--cfg", "none",
"-regex", "error|warning", "__TESTFILE__")
}
+// BenchmarkDMapWithProfiling runs dmap with profiling enabled.
func BenchmarkDMapWithProfiling(b *testing.B) {
// First generate a CSV file for dmap
csvFile := filepath.Join(os.TempDir(), "dmap_test.csv")
generateCSVTestData(b, csvFile, 10000)
defer os.Remove(csvFile)
-
+
ProfileBenchmark(b, "Count", "dmap", "--plain", "--cfg", "none",
"-query", fmt.Sprintf("select count(*) from %s", csvFile))
}
@@ -218,17 +220,17 @@ func generateCSVTestData(b *testing.B, filename string, rows int) {
b.Fatalf("Failed to create CSV file: %v", err)
}
defer f.Close()
-
+
// Write header
fmt.Fprintln(f, "timestamp,user,action,duration")
-
+
// Write data
for i := 0; i < rows; i++ {
timestamp := time.Now().Add(time.Duration(i) * time.Second).Format("2006-01-02 15:04:05")
user := fmt.Sprintf("user%d", i%100)
action := []string{"login", "query", "logout"}[i%3]
duration := 100 + i%500
-
+
fmt.Fprintf(f, "%s,%s,%s,%d\n", timestamp, user, action, duration)
}
-} \ No newline at end of file
+}
diff --git a/benchmarks/testdata_generator.go b/benchmarks/testdata_generator.go
index 8ee4e29..74487aa 100644
--- a/benchmarks/testdata_generator.go
+++ b/benchmarks/testdata_generator.go
@@ -18,10 +18,12 @@ import (
// FileSize represents the size category of test files
type FileSize int
+// Supported benchmark file sizes.
const (
- Small FileSize = 10 * 1024 * 1024 // 10MB
- Medium FileSize = 100 * 1024 * 1024 // 100MB
- Large FileSize = 1024 * 1024 * 1024 // 1GB
+ // Small represents a 10MB test file.
+ Small FileSize = 10 * 1024 * 1024 // 10MB
+ Medium FileSize = 100 * 1024 * 1024 // 100MB
+ Large FileSize = 1024 * 1024 * 1024 // 1GB
)
func (fs FileSize) String() string {
@@ -40,7 +42,9 @@ func (fs FileSize) String() string {
// LogFormat represents different log format types
type LogFormat int
+// Supported synthetic log formats.
const (
+ // SimpleLogFormat contains plain text log lines.
SimpleLogFormat LogFormat = iota
MapReduceLogFormat
MixedLogFormat
@@ -49,7 +53,9 @@ const (
// CompressionType represents file compression options
type CompressionType int
+// Supported compression modes for generated test files.
const (
+ // NoCompression stores test data without compression.
NoCompression CompressionType = iota
GzipCompression
ZstdCompression
@@ -163,30 +169,30 @@ func generateCompressedFile(tmpFile, finalFile string, config TestDataConfig, cr
// writeLogLines generates log content based on config
func writeLogLines(w io.Writer, config TestDataConfig) error {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
-
+
// Calculate approximate lines needed
avgLineSize := 150 // bytes
totalLines := int(config.Size) / avgLineSize
-
+
// Pre-generate some template lines for variation
templateLines := generateTemplateLines(config.Format, config.LineVariation, config.Pattern, config.PatternRate, rng)
-
+
bytesWritten := 0
for i := 0; i < totalLines && bytesWritten < int(config.Size); i++ {
// Pick a random template line
line := templateLines[rng.Intn(len(templateLines))]
-
+
// Write with current timestamp
timestampedLine := strings.Replace(line, "{TIMESTAMP}", generateTimestamp(i), 1)
timestampedLine = strings.Replace(timestampedLine, "{COUNTER}", fmt.Sprintf("%d", i), 1)
-
+
n, err := fmt.Fprintln(w, timestampedLine)
if err != nil {
return err
}
bytesWritten += n
}
-
+
return nil
}
@@ -194,10 +200,10 @@ func writeLogLines(w io.Writer, config TestDataConfig) error {
func generateTemplateLines(format LogFormat, variation int, pattern string, patternRate int, rng *rand.Rand) []string {
numTemplates := max(10, variation) // At least 10 templates
templates := make([]string, 0, numTemplates)
-
+
for i := 0; i < numTemplates; i++ {
includePattern := pattern != "" && rng.Intn(100) < patternRate
-
+
switch format {
case SimpleLogFormat:
templates = append(templates, generateSimpleLogLine(i, includePattern, pattern, rng))
@@ -211,7 +217,7 @@ func generateTemplateLines(format LogFormat, variation int, pattern string, patt
}
}
}
-
+
return templates
}
@@ -219,14 +225,14 @@ func generateTemplateLines(format LogFormat, variation int, pattern string, patt
func generateSimpleLogLine(id int, includePattern bool, pattern string, rng *rand.Rand) string {
levels := []string{"INFO", "WARN", "ERROR", "DEBUG"}
level := levels[rng.Intn(len(levels))]
-
+
message := fmt.Sprintf("Processing request %d", id)
if includePattern && pattern != "" {
message = fmt.Sprintf("%s %s", message, pattern)
}
-
+
// Format: LEVEL|TIMESTAMP|THREAD|FILE:LINE|MESSAGE
- return fmt.Sprintf("%s|{TIMESTAMP}|thread-%d|app.go:%d|%s",
+ return fmt.Sprintf("%s|{TIMESTAMP}|thread-%d|app.go:%d|%s",
level, rng.Intn(10)+1, rng.Intn(1000)+1, message)
}
@@ -235,12 +241,12 @@ func generateMapReduceLogLine(id int, includePattern bool, pattern string, rng *
goroutines := rng.Intn(50) + 10
connections := rng.Intn(100)
lifetime := rng.Intn(1000) + 100
-
+
message := "MAPREDUCE:STATS"
if includePattern && pattern != "" {
message = fmt.Sprintf("%s|%s", message, pattern)
}
-
+
// Format matching the integration test data
return fmt.Sprintf("INFO|{TIMESTAMP}|1|stats.go:56|8|%d|7|0.%02d|471h%dm%ds|%s|currentConnections=%d|lifetimeConnections=%d",
goroutines, rng.Intn(100), rng.Intn(60), rng.Intn(60), message, connections, lifetime)
@@ -260,19 +266,19 @@ func CleanupBenchmarkFiles(pattern string) error {
if pattern == "" {
pattern = "dtail_bench_*.tmp*"
}
-
+
tempDir := os.TempDir()
matches, err := filepath.Glob(filepath.Join(tempDir, pattern))
if err != nil {
return err
}
-
+
for _, match := range matches {
if err := os.Remove(match); err != nil && !os.IsNotExist(err) {
return err
}
}
-
+
return nil
}
@@ -282,4 +288,4 @@ func max(a, b int) int {
return a
}
return b
-} \ No newline at end of file
+}
diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go
index 4203809..9b8da4f 100644
--- a/cmd/dcat/main.go
+++ b/cmd/dcat/main.go
@@ -1,18 +1,12 @@
package main
import (
- "context"
"flag"
"os"
- "sync"
-
- "net/http"
- _ "net/http"
- _ "net/http/pprof"
+ "github.com/mimecast/dtail/internal/cli"
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/signal"
"github.com/mimecast/dtail/internal/profiling"
"github.com/mimecast/dtail/internal/source"
@@ -61,44 +55,18 @@ func main() {
version.PrintAndExit()
}
- ctx, cancel := context.WithCancel(context.Background())
- var wg sync.WaitGroup
- wg.Add(1)
- dlog.Start(ctx, &wg, source.Client)
-
- // Set up profiling
- profiler := profiling.NewProfiler(profileFlags.ToConfig("dcat"))
- defer profiler.Stop()
-
- if pprof != "" {
- dlog.Client.Info("Starting PProf", pprof)
- go func() {
- panic(http.ListenAndServe(pprof, nil))
- }()
- }
-
- // Log initial metrics if profiling is enabled
- if profileFlags.Enabled() {
- profiler.LogMetrics("startup")
- }
+ runtime := cli.NewClientRuntime(nil, profileFlags, "dcat")
+ runtime.StartPProf(pprof)
+ runtime.LogStartupMetrics()
client, err := clients.NewCatClient(args)
if err != nil {
+ runtime.Stop()
panic(err)
}
- status := client.Start(ctx, signal.InterruptCh(ctx))
-
- // Log final metrics if profiling is enabled
- if profileFlags.Enabled() {
- profiler.LogMetrics("shutdown")
- }
-
- // Stop profiler before exit
- profiler.Stop()
-
- cancel()
-
- wg.Wait()
+ status := client.Start(runtime.Context(), signal.InterruptCh(runtime.Context()))
+ runtime.LogShutdownMetrics()
+ runtime.Stop()
os.Exit(status)
}
diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go
index 0002c89..0e4eb29 100644
--- a/cmd/dgrep/main.go
+++ b/cmd/dgrep/main.go
@@ -1,18 +1,12 @@
package main
import (
- "context"
"flag"
"os"
- "sync"
-
- "net/http"
- _ "net/http"
- _ "net/http/pprof"
+ "github.com/mimecast/dtail/internal/cli"
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/signal"
"github.com/mimecast/dtail/internal/profiling"
"github.com/mimecast/dtail/internal/source"
@@ -67,48 +61,23 @@ func main() {
version.PrintAndExit()
}
- ctx, cancel := context.WithCancel(context.Background())
- var wg sync.WaitGroup
- wg.Add(1)
- dlog.Start(ctx, &wg, source.Client)
-
- // Set up profiling
- profiler := profiling.NewProfiler(profileFlags.ToConfig("dgrep"))
- defer profiler.Stop()
+ runtime := cli.NewClientRuntime(nil, profileFlags, "dgrep")
if grep != "" {
args.RegexStr = grep
}
- if pprof != "" {
- dlog.Client.Info("Starting PProf", pprof)
- go func() {
- panic(http.ListenAndServe(pprof, nil))
- }()
- }
-
- // Log initial metrics if profiling is enabled
- if profileFlags.Enabled() {
- profiler.LogMetrics("startup")
- }
+ runtime.StartPProf(pprof)
+ runtime.LogStartupMetrics()
client, err := clients.NewGrepClient(args)
if err != nil {
+ runtime.Stop()
panic(err)
}
- status := client.Start(ctx, signal.InterruptCh(ctx))
-
- // Log final metrics if profiling is enabled
- if profileFlags.Enabled() {
- profiler.LogMetrics("shutdown")
- }
-
- // Stop profiler before exit
- profiler.Stop()
-
- cancel()
-
- wg.Wait()
+ status := client.Start(runtime.Context(), signal.InterruptCh(runtime.Context()))
+ runtime.LogShutdownMetrics()
+ runtime.Stop()
os.Exit(status)
}
diff --git a/cmd/dmap/main.go b/cmd/dmap/main.go
index 498a09e..ca3981f 100644
--- a/cmd/dmap/main.go
+++ b/cmd/dmap/main.go
@@ -1,15 +1,10 @@
package main
import (
- "context"
"flag"
"os"
- "sync"
-
- "net/http"
- _ "net/http"
- _ "net/http/pprof"
+ "github.com/mimecast/dtail/internal/cli"
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
@@ -67,44 +62,21 @@ func main() {
version.PrintAndExit()
}
- ctx, cancel := context.WithCancel(context.Background())
- var wg sync.WaitGroup
- wg.Add(1)
- dlog.Start(ctx, &wg, source.Client)
-
- // Set up profiling
- profiler := profiling.NewProfiler(profileFlags.ToConfig("dmap"))
- defer profiler.Stop()
-
- if pprof != "" {
- dlog.Client.Info("Starting PProf", pprof)
- go func() {
- panic(http.ListenAndServe(pprof, nil))
- }()
- }
-
- // Log initial metrics if profiling is enabled
- if profileFlags.Enabled() {
- profiler.LogMetrics("startup")
- }
+ runtime := cli.NewClientRuntime(nil, profileFlags, "dmap")
+ runtime.StartPProf(pprof)
+ runtime.LogStartupMetrics()
client, err := clients.NewMaprClient(args, clients.DefaultMode)
if err != nil {
+ runtime.Stop()
dlog.Client.FatalPanic(err)
}
- status := client.Start(ctx, signal.InterruptChWithCancel(ctx, cancel))
-
- // Log final metrics if profiling is enabled
- if profileFlags.Enabled() {
- profiler.LogMetrics("shutdown")
- }
-
- // Stop profiler before exit
- profiler.Stop()
-
- cancel()
-
- wg.Wait()
+ status := client.Start(
+ runtime.Context(),
+ signal.InterruptChWithCancel(runtime.Context(), runtime.Cancel),
+ )
+ runtime.LogShutdownMetrics()
+ runtime.Stop()
os.Exit(status)
}
diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go
index 45c0275..ee34cc5 100644
--- a/cmd/dtail/main.go
+++ b/cmd/dtail/main.go
@@ -4,17 +4,13 @@ import (
"context"
"flag"
"fmt"
- "net/http"
- _ "net/http"
- _ "net/http/pprof"
"os"
- "sync"
"time"
+ "github.com/mimecast/dtail/internal/cli"
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/signal"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/profiling"
@@ -91,39 +87,27 @@ func main() {
}
}
- ctx, cancel := context.WithCancel(context.Background())
+ baseCtx := context.Background()
+ var timeoutCancel context.CancelFunc
if shutdownAfter > 0 {
// NEXT: This does not work (auto shutdown)
- ctx, cancel = context.WithTimeout(ctx, time.Duration(shutdownAfter)*time.Second)
- defer cancel()
+ baseCtx, timeoutCancel = context.WithTimeout(baseCtx, time.Duration(shutdownAfter)*time.Second)
}
- var wg sync.WaitGroup
- wg.Add(1)
- dlog.Start(ctx, &wg, source.Client)
-
- // Set up profiling
- profiler := profiling.NewProfiler(profileFlags.ToConfig("dtail"))
- defer profiler.Stop()
+ runtime := cli.NewClientRuntime(baseCtx, profileFlags, "dtail")
if checkHealth {
fmt.Println("WARN: DTail health check has moved to separate binary dtailhealth" +
" - please adjust the monitoring scripts!")
- cancel()
+ runtime.Stop()
+ if timeoutCancel != nil {
+ timeoutCancel()
+ }
os.Exit(1)
}
- if pprof != "" {
- dlog.Client.Info("Starting PProf", pprof)
- go func() {
- panic(http.ListenAndServe(pprof, nil))
- }()
- }
-
- // Log initial metrics if profiling is enabled
- if profileFlags.Enabled() {
- profiler.LogMetrics("startup")
- }
+ runtime.StartPProf(pprof)
+ runtime.LogStartupMetrics()
var client clients.Client
var err error
@@ -132,23 +116,24 @@ func main() {
switch args.QueryStr {
case "":
if client, err = clients.NewTailClient(args); err != nil {
+ runtime.Stop()
panic(err)
}
default:
if client, err = clients.NewMaprClient(args, clients.DefaultMode); err != nil {
+ runtime.Stop()
panic(err)
}
}
- status := client.Start(ctx, signal.InterruptChWithCancel(ctx, cancel))
-
- // Log final metrics if profiling is enabled
- if profileFlags.Enabled() {
- profiler.LogMetrics("shutdown")
+ status := client.Start(
+ runtime.Context(),
+ signal.InterruptChWithCancel(runtime.Context(), runtime.Cancel),
+ )
+ runtime.LogShutdownMetrics()
+ runtime.Stop()
+ if timeoutCancel != nil {
+ timeoutCancel()
}
-
- cancel()
-
- wg.Wait()
os.Exit(status)
}
diff --git a/integrationtests/testhelpers.go b/integrationtests/testhelpers.go
index 5368dfe..da0ecd8 100644
--- a/integrationtests/testhelpers.go
+++ b/integrationtests/testhelpers.go
@@ -52,27 +52,27 @@ func (tl *TestLogger) LogFileComparison(fileA, fileB, method string) {
func (tl *TestLogger) WriteLogFile() error {
tl.mu.Lock()
defer tl.mu.Unlock()
-
+
logFile := fmt.Sprintf("%s.log", tl.testName)
f, err := os.Create(logFile)
if err != nil {
return fmt.Errorf("failed to create log file: %w", err)
}
defer f.Close()
-
+
fmt.Fprintf(f, "Test: %s\n", tl.testName)
fmt.Fprintf(f, "Timestamp: %s\n\n", time.Now().Format(time.RFC3339))
-
+
fmt.Fprintf(f, "=== EXTERNAL COMMANDS EXECUTED (in order) ===\n")
for i, cmd := range tl.commandHistory {
fmt.Fprintf(f, "%d. %s\n", i+1, cmd)
}
-
+
fmt.Fprintf(f, "\n=== FILE COMPARISONS ===\n")
for _, comparison := range tl.fileComparisons {
fmt.Fprintf(f, "%s\n", comparison)
}
-
+
return nil
}
@@ -100,7 +100,7 @@ func cleanupTmpFiles(t *testing.T) {
t.Logf("Warning: failed to glob .tmp files: %v", err)
return
}
-
+
for _, file := range matches {
if err := os.Remove(file); err != nil {
t.Logf("Warning: failed to remove %s: %v", file, err)
@@ -137,7 +137,7 @@ func DefaultServerConfig() *ServerConfig {
}
// startTestServer starts a dserver with the given configuration
-func startTestServer(t *testing.T, ctx context.Context, cfg *ServerConfig) error {
+func startTestServer(ctx context.Context, t *testing.T, cfg *ServerConfig) error {
t.Helper()
if cfg == nil {
cfg = DefaultServerConfig()
@@ -232,7 +232,7 @@ func (ts *TestServer) Start(logLevel string) error {
// Give the server a moment to release the port
time.Sleep(100 * time.Millisecond)
})
- return startTestServer(ts.t, ts.ctx, &ServerConfig{
+ return startTestServer(ts.ctx, ts.t, &ServerConfig{
Port: ts.port,
BindAddress: ts.bindAddress,
LogLevel: logLevel,
@@ -252,7 +252,7 @@ func (ts *TestServer) StartWithConfig(cfg *ServerConfig) error {
}
cfg.Port = ts.port
cfg.BindAddress = ts.bindAddress
- return startTestServer(ts.t, ts.ctx, cfg)
+ return startTestServer(ts.ctx, ts.t, cfg)
}
// Address returns the server address in host:port format
@@ -288,7 +288,7 @@ func NewCommandArgs() *CommandArgs {
// ToSlice converts CommandArgs to a string slice for command execution
func (c *CommandArgs) ToSlice() []string {
args := []string{"--cfg", c.Config}
-
+
if c.LogLevel != "" {
args = append(args, "--logLevel", c.LogLevel)
}
@@ -301,10 +301,10 @@ func (c *CommandArgs) ToSlice() []string {
if c.NoColor {
args = append(args, "--noColor")
}
-
+
// Add ExtraArgs before server/files args for commands like dgrep where order matters
args = append(args, c.ExtraArgs...)
-
+
if len(c.Servers) > 0 {
args = append(args, "--servers", strings.Join(c.Servers, ","))
}
@@ -314,7 +314,7 @@ func (c *CommandArgs) ToSlice() []string {
if len(c.Files) > 0 {
args = append(args, "--files", strings.Join(c.Files, ","))
}
-
+
return args
}
@@ -341,7 +341,7 @@ func runDualModeTest(t *testing.T, test DualModeTest) {
// verifyFileExists checks if a file exists and is not empty
func verifyFileExists(t *testing.T, filename string) error {
t.Helper()
-
+
info, err := os.Stat(filename)
if err != nil {
return fmt.Errorf("file %s not created: %w", filename, err)
@@ -349,7 +349,7 @@ func verifyFileExists(t *testing.T, filename string) error {
if info.Size() == 0 {
return fmt.Errorf("file %s is empty", filename)
}
-
+
return nil
}
@@ -386,7 +386,7 @@ func verifyColoredOutput(t *testing.T, outFile string, expectServerMetadata bool
}
// runCommandAndVerify runs a command and verifies the output against an expected file
-func runCommandAndVerify(t *testing.T, ctx context.Context, outFile, expectedFile, cmd string, args ...string) error {
+func runCommandAndVerify(ctx context.Context, t *testing.T, outFile, expectedFile, cmd string, args ...string) error {
t.Helper()
_, err := runCommand(ctx, t, outFile, cmd, args...)
@@ -402,7 +402,7 @@ func runCommandAndVerify(t *testing.T, ctx context.Context, outFile, expectedFil
}
// runCommandAndVerifyContents runs a command and verifies the output contents (ignoring order)
-func runCommandAndVerifyContents(t *testing.T, ctx context.Context, outFile, expectedFile, cmd string, args ...string) error {
+func runCommandAndVerifyContents(ctx context.Context, t *testing.T, outFile, expectedFile, cmd string, args ...string) error {
t.Helper()
_, err := runCommand(ctx, t, outFile, cmd, args...)
@@ -456,16 +456,16 @@ func GetStandardTestPaths() *StandardTestPaths {
// verifyQueryFile checks if the query file contains the expected query content
func verifyQueryFile(t *testing.T, queryFile, expectedQuery string) error {
t.Helper()
-
+
content, err := os.ReadFile(queryFile)
if err != nil {
return fmt.Errorf("failed to read query file: %w", err)
}
-
+
actualQuery := string(content)
if actualQuery != expectedQuery {
return fmt.Errorf("query mismatch:\nExpected: %s\nActual: %s", expectedQuery, actualQuery)
}
-
+
return nil
}
diff --git a/internal/cli/runtime.go b/internal/cli/runtime.go
new file mode 100644
index 0000000..5e577c2
--- /dev/null
+++ b/internal/cli/runtime.go
@@ -0,0 +1,84 @@
+package cli
+
+import (
+ "context"
+ "net/http"
+ _ "net/http/pprof" // Register pprof handlers when runtime pprof endpoint is enabled.
+ "sync"
+
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/profiling"
+ "github.com/mimecast/dtail/internal/source"
+)
+
+// ClientRuntime owns common client command runtime components.
+type ClientRuntime struct {
+ ctx context.Context
+ cancel context.CancelFunc
+ wg sync.WaitGroup
+ profiler *profiling.Profiler
+ profileEnabled bool
+}
+
+// NewClientRuntime starts logging and profiling for a client command.
+func NewClientRuntime(parent context.Context, profileFlags profiling.Flags, profileName string) *ClientRuntime {
+ if parent == nil {
+ parent = context.Background()
+ }
+ ctx, cancel := context.WithCancel(parent)
+ runtime := &ClientRuntime{
+ ctx: ctx,
+ cancel: cancel,
+ profiler: profiling.NewProfiler(profileFlags.ToConfig(profileName)),
+ profileEnabled: profileFlags.Enabled(),
+ }
+
+ runtime.wg.Add(1)
+ dlog.Start(ctx, &runtime.wg, source.Client)
+ return runtime
+}
+
+// Context returns the runtime context.
+func (r *ClientRuntime) Context() context.Context {
+ return r.ctx
+}
+
+// Cancel cancels the runtime context.
+func (r *ClientRuntime) Cancel() {
+ r.cancel()
+}
+
+// StartPProf starts the pprof server if an address is provided.
+func (r *ClientRuntime) StartPProf(address string) {
+ if address == "" {
+ return
+ }
+
+ dlog.Client.Info("Starting PProf", address)
+ go func() {
+ if err := http.ListenAndServe(address, nil); err != nil {
+ dlog.Client.Error("PProf server exited", err)
+ }
+ }()
+}
+
+// LogStartupMetrics logs startup profiling metrics when enabled.
+func (r *ClientRuntime) LogStartupMetrics() {
+ if r.profileEnabled {
+ r.profiler.LogMetrics("startup")
+ }
+}
+
+// LogShutdownMetrics logs shutdown profiling metrics when enabled.
+func (r *ClientRuntime) LogShutdownMetrics() {
+ if r.profileEnabled {
+ r.profiler.LogMetrics("shutdown")
+ }
+}
+
+// Stop stops profiling and logging runtime goroutines.
+func (r *ClientRuntime) Stop() {
+ r.profiler.Stop()
+ r.cancel()
+ r.wg.Wait()
+}
diff --git a/internal/config/initializer.go b/internal/config/initializer.go
index b540457..6038705 100644
--- a/internal/config/initializer.go
+++ b/internal/config/initializer.go
@@ -29,13 +29,18 @@ func (in *initializer) parseConfig(args *Args) error {
return in.parseSpecificConfig(args.ConfigFile)
}
- if homeDir, err := os.UserHomeDir(); err != nil {
- var paths []string
- paths = append(paths, fmt.Sprintf("%s/.config/dtail/dtail.conf", homeDir))
- paths = append(paths, fmt.Sprintf("%s/.dtail.conf", homeDir))
+ homeDir, err := os.UserHomeDir()
+ if err == nil && homeDir != "" {
+ paths := []string{
+ fmt.Sprintf("%s/.config/dtail/dtail.conf", homeDir),
+ fmt.Sprintf("%s/.dtail.conf", homeDir),
+ }
for _, configPath := range paths {
- if _, err := os.Stat(configPath); os.IsNotExist(err) {
- continue
+ if _, err := os.Stat(configPath); err != nil {
+ if os.IsNotExist(err) {
+ continue
+ }
+ return err
}
if err := in.parseSpecificConfig(configPath); err != nil {
return err
diff --git a/internal/config/initializer_test.go b/internal/config/initializer_test.go
new file mode 100644
index 0000000..ea6b229
--- /dev/null
+++ b/internal/config/initializer_test.go
@@ -0,0 +1,68 @@
+package config
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+)
+
+func TestParseConfigLoadsDefaultXDGConfig(t *testing.T) {
+ home := t.TempDir()
+ t.Setenv("HOME", home)
+
+ configPath := filepath.Join(home, ".config", "dtail", "dtail.conf")
+ if err := os.MkdirAll(filepath.Dir(configPath), 0o755); err != nil {
+ t.Fatalf("mkdir failed: %v", err)
+ }
+ writeTestConfig(t, configPath, `{"Common":{"LogLevel":"debug"}}`)
+
+ in := initializer{
+ Common: newDefaultCommonConfig(),
+ Server: newDefaultServerConfig(),
+ Client: newDefaultClientConfig(),
+ }
+
+ if err := in.parseConfig(&Args{}); err != nil {
+ t.Fatalf("parseConfig failed: %v", err)
+ }
+ if in.Common.LogLevel != "debug" {
+ t.Fatalf("expected log level debug, got %q", in.Common.LogLevel)
+ }
+}
+
+func TestParseConfigLoadsDefaultConfigsInOrder(t *testing.T) {
+ home := t.TempDir()
+ t.Setenv("HOME", home)
+
+ xdgPath := filepath.Join(home, ".config", "dtail", "dtail.conf")
+ if err := os.MkdirAll(filepath.Dir(xdgPath), 0o755); err != nil {
+ t.Fatalf("mkdir failed: %v", err)
+ }
+ writeTestConfig(t, xdgPath, `{"Common":{"Logger":"file","LogLevel":"warn"}}`)
+
+ homePath := filepath.Join(home, ".dtail.conf")
+ writeTestConfig(t, homePath, `{"Common":{"LogLevel":"error"}}`)
+
+ in := initializer{
+ Common: newDefaultCommonConfig(),
+ Server: newDefaultServerConfig(),
+ Client: newDefaultClientConfig(),
+ }
+
+ if err := in.parseConfig(&Args{}); err != nil {
+ t.Fatalf("parseConfig failed: %v", err)
+ }
+ if in.Common.LogLevel != "error" {
+ t.Fatalf("expected final log level error, got %q", in.Common.LogLevel)
+ }
+ if in.Common.Logger != "file" {
+ t.Fatalf("expected logger file from first config, got %q", in.Common.Logger)
+ }
+}
+
+func writeTestConfig(t *testing.T, path, body string) {
+ t.Helper()
+ if err := os.WriteFile(path, []byte(body), 0o644); err != nil {
+ t.Fatalf("write config failed: %v", err)
+ }
+}
diff --git a/internal/mapr/logformat/custom1.go b/internal/mapr/logformat/custom1.go
index 05e0867..e340dbf 100644
--- a/internal/mapr/logformat/custom1.go
+++ b/internal/mapr/logformat/custom1.go
@@ -2,6 +2,7 @@ package logformat
import "errors"
+// ErrCustom1NotImplemented indicates custom1 parser is only a template.
var ErrCustom1NotImplemented error = errors.New("custom1 log format is not implemented")
// Template for creating a custom log format.
diff --git a/internal/mapr/logformat/custom2.go b/internal/mapr/logformat/custom2.go
index cc8d5b9..4fddfcd 100644
--- a/internal/mapr/logformat/custom2.go
+++ b/internal/mapr/logformat/custom2.go
@@ -2,6 +2,7 @@ package logformat
import "errors"
+// ErrCustom2NotImplemented indicates custom2 parser is only a template.
var ErrCustom2NotImplemented error = errors.New("custom2 log format is not implemented")
// Template for creating a custom log format.
diff --git a/internal/server/handlers/authkeycommand_test.go b/internal/server/handlers/authkeycommand_test.go
index bb9488b..f510038 100644
--- a/internal/server/handlers/authkeycommand_test.go
+++ b/internal/server/handlers/authkeycommand_test.go
@@ -33,11 +33,11 @@ func TestHandleAuthKeyCommandSuccess(t *testing.T) {
if message := readServerMessage(t, handler.serverMessages); message != "AUTHKEY OK\n" {
t.Fatalf("Unexpected response: %q", message)
}
- if !sshserver.ServerAuthKeyStore().Has(handler.user.Name, key) {
+ if !sshserver.AuthKeys().Has(handler.user.Name, key) {
t.Fatalf("Expected key to be stored for user")
}
- sshserver.ServerAuthKeyStore().Remove(handler.user.Name, key)
+ sshserver.AuthKeys().Remove(handler.user.Name, key)
}
func TestHandleAuthKeyCommandFeatureDisabled(t *testing.T) {
@@ -51,7 +51,7 @@ func TestHandleAuthKeyCommandFeatureDisabled(t *testing.T) {
if message := readServerMessage(t, handler.serverMessages); message != "AUTHKEY ERR feature disabled\n" {
t.Fatalf("Unexpected response: %q", message)
}
- if sshserver.ServerAuthKeyStore().Has(handler.user.Name, key) {
+ if sshserver.AuthKeys().Has(handler.user.Name, key) {
t.Fatalf("Expected no key to be stored while feature is disabled")
}
}
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index f21262e..d510139 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -128,13 +128,9 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
pool.RecycleBytesBuffer(line.Content)
line.Recycle()
- case <-time.After(time.Second):
- select {
- case <-h.done.Done():
- err = io.EOF
- return
- default:
- }
+ case <-h.done.Done():
+ err = io.EOF
+ return
}
return
}
diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go
index 650dcf2..6d7a095 100644
--- a/internal/server/handlers/readcommand_server.go
+++ b/internal/server/handlers/readcommand_server.go
@@ -8,25 +8,40 @@ import (
"github.com/mimecast/dtail/internal/mapr/server"
)
-type readCommandServer interface {
+type readCommandContext interface {
LogContext() interface{}
- SendServerMessage(message string)
+}
+
+type readCommandFiles interface {
CanReadFile(path string) bool
- ServerMessagesChannel() chan string
CatLimiter() chan struct{}
TailLimiter() chan struct{}
+}
+
+type readCommandMessages interface {
+ SendServerMessage(message string)
+ ServerMessagesChannel() chan string
Hostname() string
PlainOutput() bool
Serverless() bool
- TurboBoostDisabled() bool
- HasRegularAggregate() bool
RegisterAggregateLines(lines chan *line.Line)
SharedLinesChannel() chan *line.Line
+}
+
+type readCommandAggregates interface {
+ HasRegularAggregate() bool
TurboAggregate() *server.TurboAggregate
+}
+
+type readCommandLifecycle interface {
AddPendingFiles(delta int32) int32
CompletePendingFile() (remaining int32, activeCommands int32)
PendingAndActive() (pending int32, activeCommands int32)
TriggerShutdown()
+}
+
+type readCommandTurbo interface {
+ TurboBoostDisabled() bool
IsTurboMode() bool
EnableTurboMode()
HasTurboEOF() bool
@@ -35,6 +50,9 @@ type readCommandServer interface {
GetTurboChannel() chan []byte
TurboChannelLen() int
WaitForTurboEOFAck(timeout time.Duration) bool
+}
+
+type readCommandTiming interface {
ReadGlobRetryInterval() time.Duration
ReadRetryInterval() time.Duration
AggregateLinesChannelBufferSize() int
@@ -45,90 +63,120 @@ type readCommandServer interface {
TurboEOFAckTimeout() time.Duration
}
+type readCommandServer interface {
+ readCommandContext
+ readCommandFiles
+ readCommandMessages
+ readCommandAggregates
+ readCommandLifecycle
+ readCommandTurbo
+ readCommandTiming
+}
+
var _ readCommandServer = (*ServerHandler)(nil)
+// LogContext returns the logger context associated with the current user/session.
func (h *ServerHandler) LogContext() interface{} {
return h.user
}
+// SendServerMessage sends a formatted server message to the client.
func (h *ServerHandler) SendServerMessage(message string) {
h.sendln(h.serverMessages, message)
}
+// CanReadFile reports whether the current user can read the given path.
func (h *ServerHandler) CanReadFile(path string) bool {
return h.user.HasFilePermission(path, "readfiles")
}
+// ServerMessagesChannel returns the server message channel.
func (h *ServerHandler) ServerMessagesChannel() chan string {
return h.serverMessages
}
+// CatLimiter returns the concurrency limiter for cat/grep style reads.
func (h *ServerHandler) CatLimiter() chan struct{} {
return h.catLimiter
}
+// TailLimiter returns the concurrency limiter for tail reads.
func (h *ServerHandler) TailLimiter() chan struct{} {
return h.tailLimiter
}
+// Hostname returns the short hostname used for response formatting.
func (h *ServerHandler) Hostname() string {
return h.hostname
}
+// PlainOutput reports whether plain output mode is enabled.
func (h *ServerHandler) PlainOutput() bool {
return h.plain
}
+// Serverless reports whether the current session is running in serverless mode.
func (h *ServerHandler) Serverless() bool {
return h.serverless
}
+// TurboBoostDisabled reports whether turbo mode is disabled by configuration.
func (h *ServerHandler) TurboBoostDisabled() bool {
return h.serverCfg.TurboBoostDisable
}
+// HasRegularAggregate reports whether the regular map-reduce aggregate is active.
func (h *ServerHandler) HasRegularAggregate() bool {
return h.aggregate != nil
}
+// RegisterAggregateLines attaches a file line channel to the active aggregate.
func (h *ServerHandler) RegisterAggregateLines(lines chan *line.Line) {
if h.aggregate != nil {
h.aggregate.NextLinesCh <- lines
}
}
+// SharedLinesChannel returns the shared outbound line channel.
func (h *ServerHandler) SharedLinesChannel() chan *line.Line {
return h.lines
}
+// TurboAggregate returns the turbo aggregate if enabled for the session.
func (h *ServerHandler) TurboAggregate() *server.TurboAggregate {
return h.turboAggregate
}
+// AddPendingFiles increments or decrements the pending file counter.
func (h *ServerHandler) AddPendingFiles(delta int32) int32 {
return atomic.AddInt32(&h.pendingFiles, delta)
}
+// CompletePendingFile marks one file as completed and returns pending/active counters.
func (h *ServerHandler) CompletePendingFile() (remaining int32, activeCommands int32) {
remaining = atomic.AddInt32(&h.pendingFiles, -1)
activeCommands = atomic.LoadInt32(&h.activeCommands)
return remaining, activeCommands
}
+// PendingAndActive returns the current pending file and active command counts.
func (h *ServerHandler) PendingAndActive() (pending int32, activeCommands int32) {
pending = atomic.LoadInt32(&h.pendingFiles)
activeCommands = atomic.LoadInt32(&h.activeCommands)
return pending, activeCommands
}
+// TriggerShutdown starts the handler shutdown sequence.
func (h *ServerHandler) TriggerShutdown() {
h.shutdown()
}
+// FlushTurboData drains pending turbo data to the underlying writer.
func (h *ServerHandler) FlushTurboData() {
h.flushTurboData()
}
+// TurboEOFAckTimeout returns the timeout used while waiting for turbo EOF ACK.
func (h *ServerHandler) TurboEOFAckTimeout() time.Duration {
return durationFromMilliseconds(h.serverCfg.TurboEOFAckTimeoutMs, 2*time.Second)
}
@@ -147,22 +195,27 @@ func positiveIntOrDefault(value int, fallback int) int {
return value
}
+// ReadGlobRetryInterval returns the retry interval for glob expansion failures.
func (h *ServerHandler) ReadGlobRetryInterval() time.Duration {
return durationFromMilliseconds(h.serverCfg.ReadGlobRetryIntervalMs, 5*time.Second)
}
+// ReadRetryInterval returns the retry interval for repeated file reads.
func (h *ServerHandler) ReadRetryInterval() time.Duration {
return durationFromMilliseconds(h.serverCfg.ReadRetryIntervalMs, 2*time.Second)
}
+// AggregateLinesChannelBufferSize returns the aggregate lines channel buffer size.
func (h *ServerHandler) AggregateLinesChannelBufferSize() int {
return positiveIntOrDefault(h.serverCfg.ReadAggregateLineBufferSize, 10000)
}
+// TurboDataTransmissionDelay returns the delay used after turbo flushes.
func (h *ServerHandler) TurboDataTransmissionDelay() time.Duration {
return durationFromMilliseconds(h.serverCfg.TurboTransmissionDelayMs, 50*time.Millisecond)
}
+// TurboEOFWaitDuration returns the wait duration used before signaling turbo EOF.
func (h *ServerHandler) TurboEOFWaitDuration(fileCount int) time.Duration {
baseWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitBaseMs, 500*time.Millisecond)
if fileCount <= 10 {
@@ -178,10 +231,12 @@ func (h *ServerHandler) TurboEOFWaitDuration(fileCount int) time.Duration {
return wait
}
+// ShutdownTurboSerializeWait returns the wait before final turbo shutdown checks.
func (h *ServerHandler) ShutdownTurboSerializeWait() time.Duration {
return durationFromMilliseconds(h.serverCfg.ShutdownTurboSerializeWaitMs, 500*time.Millisecond)
}
+// ShutdownIdleRecheckWait returns the wait used for the final idle recheck.
func (h *ServerHandler) ShutdownIdleRecheckWait() time.Duration {
return durationFromMilliseconds(h.serverCfg.ShutdownIdleRecheckWaitMs, 10*time.Millisecond)
}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 5d5a78c..078fd27 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -180,6 +180,6 @@ func (h *ServerHandler) handleAuthKeyCommand(_ context.Context, _ lcontext.LCont
return
}
- sshserver.ServerAuthKeyStore().Add(h.user.Name, pubKey)
+ sshserver.AuthKeys().Add(h.user.Name, pubKey)
h.sendln(h.serverMessages, "AUTHKEY OK")
}
diff --git a/internal/ssh/client/knownhostscallback.go b/internal/ssh/client/knownhostscallback.go
index 9c73864..26ab245 100644
--- a/internal/ssh/client/knownhostscallback.go
+++ b/internal/ssh/client/knownhostscallback.go
@@ -243,8 +243,7 @@ func (c *KnownHostsCallback) trustHosts(hosts []unknownHost) {
}
// Read old known hosts file, to see which are old and new entries
- os.OpenFile(c.knownHostsPath, os.O_RDONLY|os.O_CREATE, 0666)
- oldFd, err := os.Open(c.knownHostsPath)
+ oldFd, err := os.OpenFile(c.knownHostsPath, os.O_RDONLY|os.O_CREATE, 0600)
if err != nil {
panic(err)
}
@@ -257,9 +256,14 @@ func (c *KnownHostsCallback) trustHosts(hosts []unknownHost) {
address := strings.SplitN(line, " ", 2)[0]
if _, ok := addresses[address]; !ok {
- newFd.WriteString(fmt.Sprintf("%s\n", line))
+ if _, err := newFd.WriteString(fmt.Sprintf("%s\n", line)); err != nil {
+ panic(err)
+ }
}
}
+ if err := scanner.Err(); err != nil {
+ panic(err)
+ }
// Now, replace old known hosts file
if err := os.Rename(tmpKnownHostsPath, c.knownHostsPath); err != nil {
diff --git a/internal/ssh/server/authkeystore.go b/internal/ssh/server/authkeystore.go
index c4b89fe..c96b207 100644
--- a/internal/ssh/server/authkeystore.go
+++ b/internal/ssh/server/authkeystore.go
@@ -28,8 +28,8 @@ type AuthKeyStore struct {
now func() time.Time
}
-// ServerAuthKeyStore returns the process-wide auth key cache used by the SSH server.
-func ServerAuthKeyStore() *AuthKeyStore {
+// AuthKeys returns the process-wide auth key cache used by the SSH server.
+func AuthKeys() *AuthKeyStore {
return authKeyStore
}
diff --git a/internal/tools/common/data_generator.go b/internal/tools/common/data_generator.go
index 9446d8a..d3d4225 100644
--- a/internal/tools/common/data_generator.go
+++ b/internal/tools/common/data_generator.go
@@ -12,11 +12,13 @@ import (
// DataFormat represents the format of generated data
type DataFormat string
+// Supported data generator output formats.
const (
- FormatLog DataFormat = "log"
- FormatCSV DataFormat = "csv"
- FormatDTail DataFormat = "dtail"
- FormatMapReduce DataFormat = "mapreduce"
+ // FormatLog generates generic log lines.
+ FormatLog DataFormat = "log"
+ FormatCSV DataFormat = "csv"
+ FormatDTail DataFormat = "dtail"
+ FormatMapReduce DataFormat = "mapreduce"
)
// DataGenerator generates test data for profiling and benchmarking
@@ -112,7 +114,7 @@ func (g *DataGenerator) generateLogFile(filename string, targetSize int64) error
line := fmt.Sprintf("[%s] %s - User %s performed %s action (duration: %dms, status: %s)\n",
timestamp, level, user, action, duration, status)
-
+
n, err := writer.WriteString(line)
if err != nil {
return err
@@ -157,7 +159,7 @@ func (g *DataGenerator) generateCSVFile(filename string, targetSize int64) error
}
line := fmt.Sprintf("%s,%s,%s,%d,%s\n", timestamp, user, action, duration, status)
-
+
n, err := writer.WriteString(line)
if err != nil {
return err
@@ -180,14 +182,14 @@ func (g *DataGenerator) generateDTailFormatFile(filename string, targetSize int6
var currentSize int64
lineNum := 0
- hostnames := []string{"server01", "server02", "server03", "server04", "server05",
+ hostnames := []string{"server01", "server02", "server03", "server04", "server05",
"server06", "server07", "server08", "server09", "server10"}
for currentSize < targetSize {
lineNum++
hostname := hostnames[lineNum%len(hostnames)]
- timestamp := fmt.Sprintf("%02d%02d-%02d%02d%02d",
- 10+(lineNum/86400)%12, (lineNum/3600)%30+1,
+ timestamp := fmt.Sprintf("%02d%02d-%02d%02d%02d",
+ 10+(lineNum/86400)%12, (lineNum/3600)%30+1,
(lineNum/3600)%24, (lineNum/60)%60, lineNum%60)
goroutines := 10 + (lineNum % 50)
cgocalls := lineNum % 100
@@ -199,7 +201,7 @@ func (g *DataGenerator) generateDTailFormatFile(filename string, targetSize int6
line := fmt.Sprintf("INFO|%s|1|stats.go:56|%d|%d|%d|%.2f|%s|MAPREDUCE:STATS|hostname=%s|currentConnections=%d|lifetimeConnections=%d\n",
timestamp, cpus, goroutines, cgocalls, loadavg, uptime, hostname, currentConnections, lifetimeConnections)
-
+
n, err := writer.WriteString(line)
if err != nil {
return err
@@ -220,13 +222,13 @@ func (g *DataGenerator) generateDTailFormatFileWithLines(filename string, lines
writer := bufio.NewWriter(file)
defer writer.Flush()
- hostnames := []string{"server01", "server02", "server03", "server04", "server05",
+ hostnames := []string{"server01", "server02", "server03", "server04", "server05",
"server06", "server07", "server08", "server09", "server10"}
for i := 1; i <= lines; i++ {
hostname := hostnames[i%len(hostnames)]
- timestamp := fmt.Sprintf("%02d%02d-%02d%02d%02d",
- 10+(i/86400)%12, (i/3600)%30+1,
+ timestamp := fmt.Sprintf("%02d%02d-%02d%02d%02d",
+ 10+(i/86400)%12, (i/3600)%30+1,
(i/3600)%24, (i/60)%60, i%60)
goroutines := 10 + (i % 50)
cgocalls := i % 100
@@ -238,7 +240,7 @@ func (g *DataGenerator) generateDTailFormatFileWithLines(filename string, lines
line := fmt.Sprintf("INFO|%s|1|stats.go:56|%d|%d|%d|%.2f|%s|MAPREDUCE:STATS|hostname=%s|currentConnections=%d|lifetimeConnections=%d\n",
timestamp, cpus, goroutines, cgocalls, loadavg, uptime, hostname, currentConnections, lifetimeConnections)
-
+
if _, err := writer.WriteString(line); err != nil {
return err
}
@@ -263,4 +265,4 @@ func GenerateCSVFile(filename string, lines int) error {
// Estimate size based on average line length (about 50 bytes per line)
estimatedSize := int64(lines * 50)
return g.generateCSVFile(filename, estimatedSize)
-} \ No newline at end of file
+}
diff --git a/internal/tools/profile/analyze.go b/internal/tools/profile/analyze.go
index f27841a..59503b2 100644
--- a/internal/tools/profile/analyze.go
+++ b/internal/tools/profile/analyze.go
@@ -13,13 +13,13 @@ import (
"github.com/mimecast/dtail/internal/tools/common"
)
-// ProfileInfo holds information about a profile file
-type ProfileInfo struct {
- Path string
- Tool string
- Type string // cpu, mem, alloc
- ModTime string
- Size int64
+// Info holds information about a profile file.
+type Info struct {
+ Path string
+ Tool string
+ Type string // cpu, mem, alloc
+ ModTime string
+ Size int64
}
func runAnalyze(cfg *Config) error {
@@ -58,7 +58,7 @@ func listProfiles(cfg *Config) error {
}
// Group by tool
- byTool := make(map[string][]ProfileInfo)
+ byTool := make(map[string][]Info)
for _, p := range profiles {
byTool[p.Tool] = append(byTool[p.Tool], p)
}
@@ -74,26 +74,26 @@ func listProfiles(cfg *Config) error {
for _, tool := range tools {
fmt.Printf("\n%s profiles:\n", tool)
toolProfiles := byTool[tool]
-
+
// Sort by modification time (newest first)
sort.Slice(toolProfiles, func(i, j int) bool {
return toolProfiles[i].ModTime > toolProfiles[j].ModTime
})
for _, p := range toolProfiles {
- fmt.Printf(" %-8s %s %8s %s\n",
+ fmt.Printf(" %-8s %s %8s %s\n",
p.Type, p.ModTime, common.FormatSize(p.Size), filepath.Base(p.Path))
}
}
fmt.Printf("\nTotal: %d profiles\n", len(profiles))
fmt.Printf("\nUsage: dtail-tools profile -mode analyze <profile_file>\n")
-
+
return nil
}
-func findProfiles(dir string) ([]ProfileInfo, error) {
- var profiles []ProfileInfo
+func findProfiles(dir string) ([]Info, error) {
+ var profiles []Info
pattern := filepath.Join(dir, "*.prof")
matches, err := filepath.Glob(pattern)
@@ -117,7 +117,7 @@ func findProfiles(dir string) ([]ProfileInfo, error) {
tool := parts[0]
profType := parts[1]
- profiles = append(profiles, ProfileInfo{
+ profiles = append(profiles, Info{
Path: path,
Tool: tool,
Type: profType,
@@ -158,11 +158,11 @@ func analyzeProfile(profilePath string, args ...string) error {
func showTopFunctions(profilePath string, count int, isMemProfile bool) error {
args := []string{"tool", "pprof", "-top", fmt.Sprintf("-nodecount=%d", count)}
-
+
if isMemProfile {
args = append(args, "-alloc_space")
}
-
+
args = append(args, profilePath)
cmd := exec.Command("go", args...)
@@ -178,22 +178,22 @@ func showTopFunctions(profilePath string, count int, isMemProfile bool) error {
fmt.Printf("Top %d functions (sorted by flat):\n", count)
fmt.Println("================================================================")
-
+
for scanner.Scan() {
line := scanner.Text()
-
+
// Skip header lines
- if strings.HasPrefix(line, "File:") || strings.HasPrefix(line, "Type:") ||
- strings.HasPrefix(line, "Time:") || strings.HasPrefix(line, "Duration:") {
+ if strings.HasPrefix(line, "File:") || strings.HasPrefix(line, "Type:") ||
+ strings.HasPrefix(line, "Time:") || strings.HasPrefix(line, "Duration:") {
continue
}
-
+
// Start printing from the table header
if strings.Contains(line, "flat") && strings.Contains(line, "cum") {
inTop = true
fmt.Println("# Command: go " + strings.Join(args[1:], " "))
}
-
+
if inTop {
fmt.Println(line)
if line != "" {
@@ -216,6 +216,6 @@ func openWebProfile(profilePath string) error {
cmd := exec.Command("go", "tool", "pprof", "-http=:8080", profilePath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
-
+
return cmd.Run()
-} \ No newline at end of file
+}