diff options
| author | Paul Buetow <paul@buetow.org> | 2025-06-19 17:00:58 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-06-19 17:00:58 +0300 |
| commit | 655e348870712280eac03d9e32d027e74c119ced (patch) | |
| tree | fe9546bf96e0d8d7a30d87d7d4936dc4bea31ca8 | |
| parent | 0234fbac3490ccf2b9dca36292ad6459e990e0f5 (diff) | |
Refactor: Extract magic numbers as constants and reduce client code duplication
- Created internal/constants package with organized constant files:
- timeouts.go: All time duration constants (timeouts, intervals, delays)
- channels.go: Channel buffer size constants
- limits.go: Numeric limits and configuration values
- buffers.go: Buffer size constants in bytes
- Replaced all magic numbers throughout codebase with named constants:
- Time durations (2s, 3s, 5s, 10s, 100ms, 24h) now use descriptive constants
- Buffer sizes (8KB, 64KB, 1MB) extracted to constants
- Channel buffer sizes and multipliers
- Configuration limits (max connections, concurrency, etc.)
- Health check status codes
- Percentage calculations
- Reduced code duplication in client implementations:
- Created CommonClient to share functionality between CatClient, GrepClient, and TailClient
- All three clients now inherit from CommonClient
- Eliminated duplicate makeHandler() and makeCommands() methods
- Simplified client constructors
This refactoring improves code maintainability by centralizing configuration values
and reducing redundant code across similar client implementations.
32 files changed, 290 insertions, 140 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 29a9cfc..0edae86 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -7,6 +7,7 @@ import ( "github.com/mimecast/dtail/internal/clients/connectors" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/discovery" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/regex" @@ -16,7 +17,7 @@ import ( ) // Reusable timer for retry delays - PBO optimization -var retryTimer = time.NewTimer(2 * time.Second) +var retryTimer = time.NewTimer(constants.RetryTimerDuration) // This is the main client data structure. type baseClient struct { @@ -135,7 +136,7 @@ func (c *baseClient) startConnection(ctx context.Context, i int, default: } } - retryTimer.Reset(2 * time.Second) + retryTimer.Reset(constants.RetryTimerDuration) select { case <-retryTimer.C: case <-ctx.Done(): diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index bd65560..39f56d6 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -2,19 +2,14 @@ package clients import ( "errors" - "fmt" - "runtime" - "strings" - "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" ) // CatClient is a client for returning a whole file from the beginning to the end. type CatClient struct { - baseClient + CommonClient } // NewCatClient returns a new cat client. @@ -25,30 +20,10 @@ func NewCatClient(args config.Args) (*CatClient, error) { args.Mode = omode.CatClient c := CatClient{ - baseClient: baseClient{ - Args: args, - throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), - retry: false, - }, + CommonClient: NewCommonClient(args, false), } c.init() c.makeConnections(c) return &c, nil } - -func (c CatClient) makeHandler(server string) handlers.Handler { - return handlers.NewClientHandler(server) -} - -func (c CatClient) makeCommands() (commands []string) { - regex, err := c.Regex.Serialize() - if err != nil { - dlog.Client.FatalPanic(err) - } - for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", - c.Mode.String(), c.Args.SerializeOptions(), file, regex)) - } - return -} diff --git a/internal/clients/common.go b/internal/clients/common.go new file mode 100644 index 0000000..2f35412 --- /dev/null +++ b/internal/clients/common.go @@ -0,0 +1,49 @@ +package clients + +import ( + "fmt" + "runtime" + "strings" + + "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/omode" +) + +// CommonClient provides shared functionality for CatClient, GrepClient, and TailClient +type CommonClient struct { + baseClient +} + +// NewCommonClient creates a new common client with the specified configuration +func NewCommonClient(args config.Args, retry bool) CommonClient { + return CommonClient{ + baseClient: baseClient{ + Args: args, + throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), + retry: retry, + }, + } +} + +// makeHandler returns a standard client handler +func (c CommonClient) makeHandler(server string) handlers.Handler { + return handlers.NewClientHandler(server) +} + +// makeCommands generates commands based on the client mode +func (c CommonClient) makeCommands() (commands []string) { + regex, err := c.Regex.Serialize() + if err != nil { + dlog.Client.FatalPanic(err) + } + for _, file := range strings.Split(c.What, ",") { + commands = append(commands, fmt.Sprintf("%s:%s %s %s", + c.Mode.String(), c.Args.SerializeOptions(), file, regex)) + } + if c.Mode == omode.TailClient { + dlog.Client.Debug(commands) + } + return +}
\ No newline at end of file diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index 7521c67..71f0220 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -2,20 +2,15 @@ package clients import ( "errors" - "fmt" - "runtime" - "strings" - "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" ) // GrepClient searches a remote file for all lines matching a regular // expression. Only the matching lines are displayed. type GrepClient struct { - baseClient + CommonClient } // NewGrepClient creates a new grep client. @@ -26,30 +21,10 @@ func NewGrepClient(args config.Args) (*GrepClient, error) { args.Mode = omode.GrepClient c := GrepClient{ - baseClient: baseClient{ - Args: args, - throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), - retry: false, - }, + CommonClient: NewCommonClient(args, false), } c.init() c.makeConnections(c) return &c, nil } - -func (c GrepClient) makeHandler(server string) handlers.Handler { - return handlers.NewClientHandler(server) -} - -func (c GrepClient) makeCommands() (commands []string) { - regex, err := c.Regex.Serialize() - if err != nil { - dlog.Client.FatalPanic(err) - } - for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", - c.Mode.String(), c.Args.SerializeOptions(), file, regex)) - } - return -} diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 6f637a7..20e5fe3 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -9,6 +9,7 @@ import ( "time" "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/protocol" ) @@ -47,7 +48,7 @@ func (h *baseHandler) SendMessage(command string) error { select { case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded): - case <-time.After(time.Second * 5): + case <-time.After(constants.HandlerTimeout): return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded) case <-h.Done(): return nil diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 47b594e..e602c39 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -4,6 +4,7 @@ import ( "strings" "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/protocol" ) @@ -22,7 +23,7 @@ func NewHealthHandler(server string) *HealthHandler { server: server, shellStarted: false, commands: make(chan string), - status: 2, // Assume CRITICAL status by default. + status: constants.HealthCriticalStatus, // Assume CRITICAL status by default. done: internal.NewDone(), }, } @@ -51,6 +52,6 @@ func (h *HealthHandler) handleMessage(message string) { s := strings.Split(message, protocol.FieldDelimiter) message = s[len(s)-1] if message == "OK" { - h.baseHandler.status = 0 + h.baseHandler.status = constants.HealthOKStatus } } diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 6362028..1f587ce 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -11,6 +11,7 @@ import ( "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/omode" @@ -164,12 +165,12 @@ func (c *MaprClient) printResults() { var result string var err error var numRows int - rowsLimit := -1 + rowsLimit := constants.MapReduceUnlimited - if c.query.Limit == -1 { + if c.query.Limit == constants.MapReduceUnlimited { // Limit output to 10 rows when the result is printed to stdout. // This can be overriden with the limit clause though. - rowsLimit = 10 + rowsLimit = constants.DefaultMapReduceRowsLimit } if c.cumulative { diff --git a/internal/clients/stats.go b/internal/clients/stats.go index 9a17899..7160357 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -10,12 +10,13 @@ import ( "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/protocol" ) // Reusable timer to reduce allocations - PBO optimization -var statsTimer = time.NewTimer(3 * time.Second) +var statsTimer = time.NewTimer(constants.StatsTimerDuration) // Used to collect and display various client stats. type stats struct { @@ -55,7 +56,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, default: } } - statsTimer.Reset(3 * time.Second) + statsTimer.Reset(constants.StatsTimerDuration) select { case message := <-statsCh: @@ -104,7 +105,7 @@ func (s *stats) printStatsDueInterrupt(messages []string) { } fmt.Println(fmt.Sprintf(" %s", message)) } - time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS)) + time.Sleep(time.Second * time.Duration(constants.InterruptTimeoutSeconds)) dlog.Client.Resume() } @@ -149,7 +150,7 @@ func (s *stats) numConnected() int { func percentOf(total float64, value float64) float64 { if total == 0 || total == value { - return 100 + return constants.PercentageMultiplier } - return value / (total / 100.0) + return value / (total / constants.PercentageMultiplier) } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index 35c01d4..f71da41 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -1,50 +1,23 @@ package clients import ( - "fmt" - "runtime" - "strings" - - "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" ) // TailClient is used for tailing remote log files (opening, seeking to the end and returning only new incoming lines). type TailClient struct { - baseClient + CommonClient } // NewTailClient returns a new TailClient. func NewTailClient(args config.Args) (*TailClient, error) { args.Mode = omode.TailClient c := TailClient{ - baseClient: baseClient{ - Args: args, - throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), - retry: true, - }, + CommonClient: NewCommonClient(args, true), } c.init() c.makeConnections(c) return &c, nil } - -func (c TailClient) makeHandler(server string) handlers.Handler { - return handlers.NewClientHandler(server) -} - -func (c TailClient) makeCommands() (commands []string) { - regex, err := c.Regex.Serialize() - if err != nil { - dlog.Client.FatalPanic(err) - } - for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", - c.Mode.String(), c.Args.SerializeOptions(), file, regex)) - } - dlog.Client.Debug(commands) - return -} diff --git a/internal/config/config.go b/internal/config/config.go index ee23829..8cc6287 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,6 +1,9 @@ package config -import "github.com/mimecast/dtail/internal/source" +import ( + "github.com/mimecast/dtail/internal/constants" + "github.com/mimecast/dtail/internal/source" +) const ( // HealthUser is used for the health check @@ -10,11 +13,11 @@ const ( // ContinuousUser is used for non-interactive continuous mapreduce queries. ContinuousUser string = "DTAIL-CONTINUOUS" // InterruptTimeoutS specifies the Ctrl+C log pause interval. - InterruptTimeoutS int = 3 + InterruptTimeoutS int = constants.InterruptTimeoutSeconds // DefaultConnectionsPerCPU controls how many connections are established concurrently. - DefaultConnectionsPerCPU int = 10 + DefaultConnectionsPerCPU int = constants.DefaultConnectionsPerCPU // DefaultSSHPort is the default DServer port. - DefaultSSHPort int = 2222 + DefaultSSHPort int = constants.DefaultSSHPort // DefaultLogLevel specifies the default log level (obviously) DefaultLogLevel string = "info" // DefaultClientLogger specifies the default logger for the client commands. diff --git a/internal/config/initializer.go b/internal/config/initializer.go index 9724902..825f612 100644 --- a/internal/config/initializer.go +++ b/internal/config/initializer.go @@ -8,6 +8,7 @@ import ( "os" "strings" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/source" ) @@ -83,7 +84,7 @@ func (in *initializer) transformConfig(sourceProcess source.Source, args *Args, func (in *initializer) processEnvVars(args *Args) { if Env("DTAIL_INTEGRATION_TEST_RUN_MODE") { os.Setenv("DTAIL_HOSTNAME_OVERRIDE", "integrationtest") - in.Server.MaxLineLength = 1024 + in.Server.MaxLineLength = constants.DefaultMaxLineLength } sshPrivateKeyPathFile := os.Getenv("DTAIL_SSH_PRIVATE_KEYFILE_PATH") if len(sshPrivateKeyPathFile) > 0 && args.SSHPrivateKeyFilePath == "" { diff --git a/internal/config/server.go b/internal/config/server.go index cb9ca2b..38186fd 100644 --- a/internal/config/server.go +++ b/internal/config/server.go @@ -2,6 +2,8 @@ package config import ( "errors" + + "github.com/mimecast/dtail/internal/constants" ) // Permissions map. Each SSH user has a list of permissions which log files it @@ -74,13 +76,13 @@ func newDefaultServerConfig() *ServerConfig { defaultPermissions := []string{"^/.*"} defaultBindAddress := "0.0.0.0" return &ServerConfig{ - HostKeyBits: 4096, + HostKeyBits: constants.HostKeyBits, HostKeyFile: "./cache/ssh_host_key", MapreduceLogFormat: "default", - MaxConcurrentCats: 2, - MaxConcurrentTails: 50, - MaxConnections: 10, - MaxLineLength: 1024 * 1024, + MaxConcurrentCats: constants.MaxConcurrentCats, + MaxConcurrentTails: constants.MaxConcurrentTails, + MaxConnections: constants.MaxConnections, + MaxLineLength: constants.ServerMaxLineLength, SSHBindAddress: defaultBindAddress, Permissions: Permissions{ Default: defaultPermissions, diff --git a/internal/constants/buffers.go b/internal/constants/buffers.go new file mode 100644 index 0000000..b80586a --- /dev/null +++ b/internal/constants/buffers.go @@ -0,0 +1,19 @@ +package constants + +// Buffer size constants in bytes +const ( + // StatsArraySize is the size of arrays for tracking stats + StatsArraySize = 100 + + // LineBufferInitialCapacity is the initial capacity for line buffers (8KB) + LineBufferInitialCapacity = 8192 + + // ReadBufferSize is the size of read buffers (8KB) + ReadBufferSize = 8192 + + // DefaultChunkSize is the default chunk size for reading (64KB) + DefaultChunkSize = 64 * 1024 + + // InitialBufferSize is the initial buffer size for processors (64KB) + InitialBufferSize = 64 * 1024 +)
\ No newline at end of file diff --git a/internal/constants/channels.go b/internal/constants/channels.go new file mode 100644 index 0000000..9f2d619 --- /dev/null +++ b/internal/constants/channels.go @@ -0,0 +1,14 @@ +package constants + +// Channel buffer size constants +const ( + // DefaultLinesChannelSize is the default buffer size for lines channels + DefaultLinesChannelSize = 100 + + // DefaultServerMessagesChannelSize is the default buffer size for server messages + DefaultServerMessagesChannelSize = 10 + + // LoggerBufferChannelSize is the buffer size for logger channels + // Calculated as runtime.NumCPU() * 100 at runtime + LoggerBufferChannelMultiplier = 100 +)
\ No newline at end of file diff --git a/internal/constants/limits.go b/internal/constants/limits.go new file mode 100644 index 0000000..5b871ce --- /dev/null +++ b/internal/constants/limits.go @@ -0,0 +1,55 @@ +package constants + +// Numeric limits and configuration values +const ( + // MaxFlushRetries is the maximum number of flush retry attempts + MaxFlushRetries = 10 + + // MaxReadCommandRetries is the maximum number of read command retries + MaxReadCommandRetries = 5 + + // DefaultConnectionsPerCPU is the default number of connections per CPU + DefaultConnectionsPerCPU = 10 + + // DefaultSSHPort is the default SSH server port + DefaultSSHPort = 2222 + + // InterruptTimeoutSeconds is the timeout for interrupt handling + InterruptTimeoutSeconds = 3 + + // MaxConcurrentCats is the maximum number of concurrent cat operations + MaxConcurrentCats = 2 + + // MaxConcurrentTails is the maximum number of concurrent tail operations + MaxConcurrentTails = 50 + + // MaxConnections is the maximum total number of connections + MaxConnections = 10 + + // HostKeyBits is the number of bits for SSH host keys + HostKeyBits = 4096 + + // DefaultMaxLineLength is the default maximum line length (1KB) + DefaultMaxLineLength = 1024 + + // ServerMaxLineLength is the server maximum line length (1MB) + ServerMaxLineLength = 1024 * 1024 + + // MaxSymlinkDepth is the maximum depth for following symlinks + MaxSymlinkDepth = 100 + + // DefaultMapReduceRowsLimit is the default limit for MapReduce output rows + DefaultMapReduceRowsLimit = 10 + + // MapReduceUnlimited indicates no limit on MapReduce rows + MapReduceUnlimited = -1 + + // HealthOKStatus is the status code for healthy service + HealthOKStatus = 0 + + // HealthCriticalStatus is the status code for critical health issues + HealthCriticalStatus = 2 + + // PercentageMultiplier is used for percentage calculations + PercentageMultiplier = 100.0 +)
\ No newline at end of file diff --git a/internal/constants/timeouts.go b/internal/constants/timeouts.go new file mode 100644 index 0000000..9373b23 --- /dev/null +++ b/internal/constants/timeouts.go @@ -0,0 +1,60 @@ +package constants + +import "time" + +// Timeout constants used throughout the application +const ( + // ReadTimeout is the timeout for read operations + ReadTimeout = 1 * time.Second + + // WriteTimeout is the timeout for write operations + WriteTimeout = 10 * time.Second + + // HandlerTimeout is the timeout for handler operations + HandlerTimeout = 5 * time.Second + + // SSHConnectionTimeout is the timeout for SSH connection attempts + SSHConnectionTimeout = 30 * time.Second + + // ReconnectSleepDuration is how long to wait before reconnecting + ReconnectSleepDuration = 2 * time.Second + + // StatsTimerDuration is the interval for client stats reporting + StatsTimerDuration = 3 * time.Second + + // ServerStatsTimerDuration is the interval for server stats reporting + ServerStatsTimerDuration = 10 * time.Second + + // DefaultMapReduceInterval is the default interval for MapReduce operations + DefaultMapReduceInterval = 5 * time.Second + + // ProcessorSleepDuration is the sleep duration for processors + ProcessorSleepDuration = 10 * time.Millisecond + + // ProcessorTimeoutDuration is the timeout for processor operations + ProcessorTimeoutDuration = 100 * time.Millisecond + + // MapReduceSleepDuration is the sleep duration in MapReduce aggregation + MapReduceSleepDuration = 100 * time.Millisecond + + // ContinuousJobsStartDelay is the delay before starting continuous jobs + ContinuousJobsStartDelay = 2 * time.Second + + // SchedulerStartDelay is the delay before starting the scheduler + SchedulerStartDelay = 2 * time.Second + + // RetryTimerDuration is the duration for retry operations + RetryTimerDuration = 2 * time.Second + + // SSHDialTimeout is the timeout for SSH dial operations + SSHDialTimeout = 2 * time.Second + + // KnownHostsCallbackTimeout is the timeout for known hosts callback + KnownHostsCallbackTimeout = 2 * time.Second + + // ReadCommandRetryInterval is the interval between read command retries + ReadCommandRetryInterval = 5 * time.Second + + // DayDuration represents 24 hours for date calculations + DayDuration = 24 * time.Hour +)
\ No newline at end of file diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go index 6a09353..b183843 100644 --- a/internal/io/dlog/loggers/file.go +++ b/internal/io/dlog/loggers/file.go @@ -10,6 +10,7 @@ import ( "time" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/constants" ) type fileWriter struct{} @@ -36,7 +37,7 @@ type file struct { func newFile(strategy Strategy) *file { return &file{ - bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100), + bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*constants.LoggerBufferChannelMultiplier), pauseCh: make(chan struct{}), resumeCh: make(chan struct{}), rotateCh: make(chan struct{}), diff --git a/internal/io/fs/aggregateprocessor.go b/internal/io/fs/aggregateprocessor.go index 98d0c31..809f298 100644 --- a/internal/io/fs/aggregateprocessor.go +++ b/internal/io/fs/aggregateprocessor.go @@ -5,6 +5,7 @@ import ( "context" "time" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" @@ -69,7 +70,7 @@ func (p *AggregateLineProcessor) Flush() []byte { if !p.isTailing { // Close the lines channel to signal end of input // Add a small delay to ensure all lines are processed before closing - time.Sleep(10 * time.Millisecond) + time.Sleep(constants.ProcessorSleepDuration) close(p.linesCh) } return nil diff --git a/internal/io/fs/chunkedreader.go b/internal/io/fs/chunkedreader.go index ab78ba1..7775a58 100644 --- a/internal/io/fs/chunkedreader.go +++ b/internal/io/fs/chunkedreader.go @@ -6,12 +6,13 @@ import ( "io" "time" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/pool" ) // Reusable timer to reduce allocations - PBO optimization -var sharedTimer = time.NewTimer(10 * time.Millisecond) +var sharedTimer = time.NewTimer(constants.ProcessorSleepDuration) // ChunkedReader reads data in large chunks and processes it line by line // This replaces the byte-by-byte reading approach for better performance @@ -29,14 +30,14 @@ type ChunkedReader struct { // NewChunkedReader creates a new chunked reader with the specified chunk size func NewChunkedReader(reader io.Reader, chunkSize int) *ChunkedReader { if chunkSize <= 0 { - chunkSize = 64 * 1024 // Default 64KB chunks + chunkSize = constants.DefaultChunkSize // Default 64KB chunks } return &ChunkedReader{ reader: reader, buffer: make([]byte, chunkSize), chunkSize: chunkSize, // PBO optimization: Pre-allocate line buffer - lineBuffer: make([]byte, 0, 8192), // 8KB initial capacity + lineBuffer: make([]byte, 0, constants.LineBufferInitialCapacity), // 8KB initial capacity } } @@ -76,7 +77,7 @@ func (cr *ChunkedReader) ProcessLines(ctx context.Context, rawLines chan *bytes. default: } } - sharedTimer.Reset(10 * time.Millisecond) + sharedTimer.Reset(constants.ProcessorSleepDuration) select { case <-ctx.Done(): return ctx.Err() diff --git a/internal/io/fs/directprocessor.go b/internal/io/fs/directprocessor.go index 84b78bb..312ac9c 100644 --- a/internal/io/fs/directprocessor.go +++ b/internal/io/fs/directprocessor.go @@ -9,6 +9,7 @@ import ( "time" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/lcontext" ) @@ -77,7 +78,7 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader, // Set buffer size respecting MaxLineLength configuration maxLineLength := config.Server.MaxLineLength - initialBufSize := 64 * 1024 + initialBufSize := constants.InitialBufferSize if maxLineLength < initialBufSize { initialBufSize = maxLineLength } @@ -337,7 +338,7 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro select { case <-ctx.Done(): return ctx.Err() - case <-time.After(100 * time.Millisecond): + case <-time.After(constants.ProcessorTimeoutDuration): // Check if file has grown fileInfo, err := os.Stat(filePath) if err != nil { diff --git a/internal/io/fs/stats.go b/internal/io/fs/stats.go index 4121ff7..1990e53 100644 --- a/internal/io/fs/stats.go +++ b/internal/io/fs/stats.go @@ -1,14 +1,16 @@ package fs +import "github.com/mimecast/dtail/internal/constants" + // Used to calculate how many log lines matched the regular expression // and how many log files could be transmitted from the server to the client. // Hit and transmit percentage takes only the last 100 log lines into calculation. type stats struct { pos int lineCount uint64 - matched [100]bool + matched [constants.StatsArraySize]bool matchCount uint64 - transmitted [100]bool + transmitted [constants.StatsArraySize]bool transmitCount int } @@ -25,7 +27,7 @@ func (f *stats) transmittedPerc() int { // Update bucket position. We only take into consideration the last 100 // lines for stats. func (f *stats) updatePosition() { - f.pos = (f.pos + 1) % 100 + f.pos = (f.pos + 1) % constants.StatsArraySize f.lineCount++ } @@ -63,7 +65,7 @@ func (f *stats) updateLineNotTransmitted() { func percentOf(total float64, value float64) float64 { if total == 0 || total == value { - return 100 + return constants.PercentageMultiplier } - return value / (total / 100.0) + return value / (total / constants.PercentageMultiplier) } diff --git a/internal/io/fs/tailprocessor.go b/internal/io/fs/tailprocessor.go index 3bc9029..55934ce 100644 --- a/internal/io/fs/tailprocessor.go +++ b/internal/io/fs/tailprocessor.go @@ -7,6 +7,7 @@ import ( "time" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" ) @@ -163,7 +164,7 @@ func (ftp *FollowingTailProcessor) followFile(ctx context.Context, filePath stri func (ftp *FollowingTailProcessor) followReader(ctx context.Context, file *os.File, filePath string) error { // Set buffer size respecting MaxLineLength configuration maxLineLength := config.Server.MaxLineLength - initialBufSize := 64 * 1024 + initialBufSize := constants.InitialBufferSize if maxLineLength < initialBufSize { initialBufSize = maxLineLength } @@ -259,7 +260,7 @@ func (ftp *FollowingTailProcessor) followReader(ctx context.Context, file *os.Fi select { case <-ctx.Done(): return ctx.Err() - case <-time.After(100 * time.Millisecond): + case <-time.After(constants.ProcessorTimeoutDuration): // Continue the loop to check for new content } } diff --git a/internal/mapr/query.go b/internal/mapr/query.go index ddcbc90..dc1a1c2 100644 --- a/internal/mapr/query.go +++ b/internal/mapr/query.go @@ -6,6 +6,8 @@ import ( "strconv" "strings" "time" + + "github.com/mimecast/dtail/internal/constants" ) const ( @@ -70,8 +72,8 @@ func NewQuery(queryStr string) (*Query, error) { q := Query{ RawQuery: queryStr, tokens: tokens, - Interval: time.Second * 5, - Limit: -1, + Interval: constants.DefaultMapReduceInterval, + Limit: constants.MapReduceUnlimited, } // If log format is CSV, then use "." as the table. It means, that diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 4f14751..8fabfc9 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -7,6 +7,7 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/mapr" @@ -167,7 +168,7 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin if noMoreChannels { return } - time.Sleep(time.Millisecond * 100) + time.Sleep(constants.MapReduceSleepDuration) continue } diff --git a/internal/server/continuous.go b/internal/server/continuous.go index ac5c686..0a218d4 100644 --- a/internal/server/continuous.go +++ b/internal/server/continuous.go @@ -8,6 +8,7 @@ import ( "github.com/mimecast/dtail/internal/clients" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" gossh "golang.org/x/crypto/ssh" @@ -21,7 +22,7 @@ func newContinuous() *continuous { func (c *continuous) start(ctx context.Context) { dlog.Server.Info("Starting continuous job runner after 2s") - time.Sleep(time.Second * 2) + time.Sleep(constants.ContinuousJobsStartDelay) c.runJobs(ctx) } diff --git a/internal/server/filldates.go b/internal/server/filldates.go index a6b0ba8..e7489ca 100644 --- a/internal/server/filldates.go +++ b/internal/server/filldates.go @@ -3,21 +3,23 @@ package server import ( "strings" "time" + + "github.com/mimecast/dtail/internal/constants" ) func fillDates(str string) string { - yyyesterday := time.Now().Add(3 * -24 * time.Hour).Format("20060102") + yyyesterday := time.Now().Add(-3 * constants.DayDuration).Format("20060102") str = strings.ReplaceAll(str, "$yyyesterday", yyyesterday) - yyesterday := time.Now().Add(2 * -24 * time.Hour).Format("20060102") + yyesterday := time.Now().Add(-2 * constants.DayDuration).Format("20060102") str = strings.ReplaceAll(str, "$yyesterday", yyesterday) - yesterday := time.Now().Add(1 * -24 * time.Hour).Format("20060102") + yesterday := time.Now().Add(-1 * constants.DayDuration).Format("20060102") str = strings.ReplaceAll(str, "$yesterday", yesterday) today := time.Now().Format("20060102") str = strings.ReplaceAll(str, "$today", today) - tomorrow := time.Now().Add(1 * 24 * time.Hour).Format("20060102") + tomorrow := time.Now().Add(1 * constants.DayDuration).Format("20060102") return strings.ReplaceAll(str, "$tomorrow", tomorrow) } diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index e8ce19a..19096bf 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -15,6 +15,7 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" @@ -117,7 +118,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { pool.RecycleBytesBuffer(line.Content) line.Recycle() - case <-time.After(time.Second): + case <-time.After(constants.ReadTimeout): select { case <-h.done.Done(): err = io.EOF @@ -288,13 +289,13 @@ func (h *baseHandler) flush() { numUnsentMessages := func() int { return len(h.lines) + len(h.serverMessages) + len(h.maprMessages) } - for i := 0; i < 10; i++ { + for i := 0; i < constants.MaxFlushRetries; i++ { if numUnsentMessages() == 0 { dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h)) return } dlog.Server.Debug(h.user, "Still lines to be sent") - time.Sleep(time.Millisecond * 10) + time.Sleep(constants.ProcessorSleepDuration) } dlog.Server.Warn(h.user, "Some lines remain unsent", numUnsentMessages()) } @@ -312,7 +313,7 @@ func (h *baseHandler) shutdown() { select { case <-h.ackCloseReceived: - case <-time.After(time.Second * 5): + case <-time.After(constants.HandlerTimeout): dlog.Server.Debug(h.user, "Shutdown timeout reached, enforcing shutdown") case <-h.done.Done(): } diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index c75b9fc..f7568a5 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" @@ -106,7 +107,7 @@ func (r *readCommand) start(ctx context.Context, ltx lcontext.LContext, func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, glob string, re regex.Regex, retries int, queryStr string) { - retryInterval := time.Second * 5 + retryInterval := constants.ReadCommandRetryInterval glob = filepath.Clean(glob) for retryCount := 0; retryCount < retries; retryCount++ { diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go index cc51b7e..1f89ef5 100644 --- a/internal/server/scheduler.go +++ b/internal/server/scheduler.go @@ -10,6 +10,7 @@ import ( "github.com/mimecast/dtail/internal/clients" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" @@ -24,8 +25,8 @@ func newScheduler() *scheduler { func (s *scheduler) start(ctx context.Context) { dlog.Server.Info("Starting scheduled job runner after 2s") - // First run after just 10s! - time.Sleep(time.Second * 2) + // First run after just 2s! + time.Sleep(constants.SchedulerStartDelay) s.runJobs(ctx) for { select { diff --git a/internal/server/stats.go b/internal/server/stats.go index 99a644a..cdf3f03 100644 --- a/internal/server/stats.go +++ b/internal/server/stats.go @@ -7,6 +7,7 @@ import ( "time" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" ) @@ -67,7 +68,7 @@ func (s *stats) serverLimitExceeded() error { func (s *stats) start(ctx context.Context) { for { select { - case <-time.NewTimer(time.Second * 10).C: + case <-time.NewTimer(constants.ServerStatsTimerDuration).C: s.logServerStats() case <-ctx.Done(): return @@ -78,7 +79,7 @@ func (s *stats) start(ctx context.Context) { func (s *stats) waitForConnections() { for { select { - case <-time.NewTimer(time.Second).C: + case <-time.NewTimer(constants.ReadTimeout).C: if !s.hasConnections() { return } diff --git a/internal/ssh/client/knownhostscallback.go b/internal/ssh/client/knownhostscallback.go index 393c4c7..d5f605f 100644 --- a/internal/ssh/client/knownhostscallback.go +++ b/internal/ssh/client/knownhostscallback.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/prompt" @@ -125,7 +126,7 @@ func (c KnownHostsCallback) PromptAddHosts(ctx context.Context) { c.promptAddHosts(hosts) hosts = []unknownHost{} } - case <-time.After(2 * time.Second): + case <-time.After(constants.KnownHostsCallbackTimeout): // Or ask when after 2 seconds no new unknown hosts were added. if len(hosts) > 0 { c.promptAddHosts(hosts) diff --git a/internal/user/server/user.go b/internal/user/server/user.go index abf74f3..2b2d720 100644 --- a/internal/user/server/user.go +++ b/internal/user/server/user.go @@ -8,11 +8,12 @@ import ( "strings" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/constants" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/fs/permissions" ) -const maxLinkDepth int = 100 +const maxLinkDepth int = constants.MaxSymlinkDepth // User represents an end-user which connected to the server via the DTail client. type User struct { |
