summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-19 17:00:58 +0300
committerPaul Buetow <paul@buetow.org>2025-06-19 17:00:58 +0300
commit655e348870712280eac03d9e32d027e74c119ced (patch)
treefe9546bf96e0d8d7a30d87d7d4936dc4bea31ca8
parent0234fbac3490ccf2b9dca36292ad6459e990e0f5 (diff)
Refactor: Extract magic numbers as constants and reduce client code duplication
- Created internal/constants package with organized constant files: - timeouts.go: All time duration constants (timeouts, intervals, delays) - channels.go: Channel buffer size constants - limits.go: Numeric limits and configuration values - buffers.go: Buffer size constants in bytes - Replaced all magic numbers throughout codebase with named constants: - Time durations (2s, 3s, 5s, 10s, 100ms, 24h) now use descriptive constants - Buffer sizes (8KB, 64KB, 1MB) extracted to constants - Channel buffer sizes and multipliers - Configuration limits (max connections, concurrency, etc.) - Health check status codes - Percentage calculations - Reduced code duplication in client implementations: - Created CommonClient to share functionality between CatClient, GrepClient, and TailClient - All three clients now inherit from CommonClient - Eliminated duplicate makeHandler() and makeCommands() methods - Simplified client constructors This refactoring improves code maintainability by centralizing configuration values and reducing redundant code across similar client implementations.
-rw-r--r--internal/clients/baseclient.go5
-rw-r--r--internal/clients/catclient.go29
-rw-r--r--internal/clients/common.go49
-rw-r--r--internal/clients/grepclient.go29
-rw-r--r--internal/clients/handlers/basehandler.go3
-rw-r--r--internal/clients/handlers/healthhandler.go5
-rw-r--r--internal/clients/maprclient.go7
-rw-r--r--internal/clients/stats.go11
-rw-r--r--internal/clients/tailclient.go31
-rw-r--r--internal/config/config.go11
-rw-r--r--internal/config/initializer.go3
-rw-r--r--internal/config/server.go12
-rw-r--r--internal/constants/buffers.go19
-rw-r--r--internal/constants/channels.go14
-rw-r--r--internal/constants/limits.go55
-rw-r--r--internal/constants/timeouts.go60
-rw-r--r--internal/io/dlog/loggers/file.go3
-rw-r--r--internal/io/fs/aggregateprocessor.go3
-rw-r--r--internal/io/fs/chunkedreader.go9
-rw-r--r--internal/io/fs/directprocessor.go5
-rw-r--r--internal/io/fs/stats.go12
-rw-r--r--internal/io/fs/tailprocessor.go5
-rw-r--r--internal/mapr/query.go6
-rw-r--r--internal/mapr/server/aggregate.go3
-rw-r--r--internal/server/continuous.go3
-rw-r--r--internal/server/filldates.go10
-rw-r--r--internal/server/handlers/basehandler.go9
-rw-r--r--internal/server/handlers/readcommand.go3
-rw-r--r--internal/server/scheduler.go5
-rw-r--r--internal/server/stats.go5
-rw-r--r--internal/ssh/client/knownhostscallback.go3
-rw-r--r--internal/user/server/user.go3
32 files changed, 290 insertions, 140 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index 29a9cfc..0edae86 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -7,6 +7,7 @@ import (
"github.com/mimecast/dtail/internal/clients/connectors"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/discovery"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/regex"
@@ -16,7 +17,7 @@ import (
)
// Reusable timer for retry delays - PBO optimization
-var retryTimer = time.NewTimer(2 * time.Second)
+var retryTimer = time.NewTimer(constants.RetryTimerDuration)
// This is the main client data structure.
type baseClient struct {
@@ -135,7 +136,7 @@ func (c *baseClient) startConnection(ctx context.Context, i int,
default:
}
}
- retryTimer.Reset(2 * time.Second)
+ retryTimer.Reset(constants.RetryTimerDuration)
select {
case <-retryTimer.C:
case <-ctx.Done():
diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go
index bd65560..39f56d6 100644
--- a/internal/clients/catclient.go
+++ b/internal/clients/catclient.go
@@ -2,19 +2,14 @@ package clients
import (
"errors"
- "fmt"
- "runtime"
- "strings"
- "github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
)
// CatClient is a client for returning a whole file from the beginning to the end.
type CatClient struct {
- baseClient
+ CommonClient
}
// NewCatClient returns a new cat client.
@@ -25,30 +20,10 @@ func NewCatClient(args config.Args) (*CatClient, error) {
args.Mode = omode.CatClient
c := CatClient{
- baseClient: baseClient{
- Args: args,
- throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()),
- retry: false,
- },
+ CommonClient: NewCommonClient(args, false),
}
c.init()
c.makeConnections(c)
return &c, nil
}
-
-func (c CatClient) makeHandler(server string) handlers.Handler {
- return handlers.NewClientHandler(server)
-}
-
-func (c CatClient) makeCommands() (commands []string) {
- regex, err := c.Regex.Serialize()
- if err != nil {
- dlog.Client.FatalPanic(err)
- }
- for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- c.Mode.String(), c.Args.SerializeOptions(), file, regex))
- }
- return
-}
diff --git a/internal/clients/common.go b/internal/clients/common.go
new file mode 100644
index 0000000..2f35412
--- /dev/null
+++ b/internal/clients/common.go
@@ -0,0 +1,49 @@
+package clients
+
+import (
+ "fmt"
+ "runtime"
+ "strings"
+
+ "github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/omode"
+)
+
+// CommonClient provides shared functionality for CatClient, GrepClient, and TailClient
+type CommonClient struct {
+ baseClient
+}
+
+// NewCommonClient creates a new common client with the specified configuration
+func NewCommonClient(args config.Args, retry bool) CommonClient {
+ return CommonClient{
+ baseClient: baseClient{
+ Args: args,
+ throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()),
+ retry: retry,
+ },
+ }
+}
+
+// makeHandler returns a standard client handler
+func (c CommonClient) makeHandler(server string) handlers.Handler {
+ return handlers.NewClientHandler(server)
+}
+
+// makeCommands generates commands based on the client mode
+func (c CommonClient) makeCommands() (commands []string) {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
+ for _, file := range strings.Split(c.What, ",") {
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s",
+ c.Mode.String(), c.Args.SerializeOptions(), file, regex))
+ }
+ if c.Mode == omode.TailClient {
+ dlog.Client.Debug(commands)
+ }
+ return
+} \ No newline at end of file
diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go
index 7521c67..71f0220 100644
--- a/internal/clients/grepclient.go
+++ b/internal/clients/grepclient.go
@@ -2,20 +2,15 @@ package clients
import (
"errors"
- "fmt"
- "runtime"
- "strings"
- "github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
)
// GrepClient searches a remote file for all lines matching a regular
// expression. Only the matching lines are displayed.
type GrepClient struct {
- baseClient
+ CommonClient
}
// NewGrepClient creates a new grep client.
@@ -26,30 +21,10 @@ func NewGrepClient(args config.Args) (*GrepClient, error) {
args.Mode = omode.GrepClient
c := GrepClient{
- baseClient: baseClient{
- Args: args,
- throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()),
- retry: false,
- },
+ CommonClient: NewCommonClient(args, false),
}
c.init()
c.makeConnections(c)
return &c, nil
}
-
-func (c GrepClient) makeHandler(server string) handlers.Handler {
- return handlers.NewClientHandler(server)
-}
-
-func (c GrepClient) makeCommands() (commands []string) {
- regex, err := c.Regex.Serialize()
- if err != nil {
- dlog.Client.FatalPanic(err)
- }
- for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- c.Mode.String(), c.Args.SerializeOptions(), file, regex))
- }
- return
-}
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 6f637a7..20e5fe3 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -9,6 +9,7 @@ import (
"time"
"github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -47,7 +48,7 @@ func (h *baseHandler) SendMessage(command string) error {
select {
case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded):
- case <-time.After(time.Second * 5):
+ case <-time.After(constants.HandlerTimeout):
return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded)
case <-h.Done():
return nil
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 47b594e..e602c39 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -4,6 +4,7 @@ import (
"strings"
"github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -22,7 +23,7 @@ func NewHealthHandler(server string) *HealthHandler {
server: server,
shellStarted: false,
commands: make(chan string),
- status: 2, // Assume CRITICAL status by default.
+ status: constants.HealthCriticalStatus, // Assume CRITICAL status by default.
done: internal.NewDone(),
},
}
@@ -51,6 +52,6 @@ func (h *HealthHandler) handleMessage(message string) {
s := strings.Split(message, protocol.FieldDelimiter)
message = s[len(s)-1]
if message == "OK" {
- h.baseHandler.status = 0
+ h.baseHandler.status = constants.HealthOKStatus
}
}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 6362028..1f587ce 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -11,6 +11,7 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/omode"
@@ -164,12 +165,12 @@ func (c *MaprClient) printResults() {
var result string
var err error
var numRows int
- rowsLimit := -1
+ rowsLimit := constants.MapReduceUnlimited
- if c.query.Limit == -1 {
+ if c.query.Limit == constants.MapReduceUnlimited {
// Limit output to 10 rows when the result is printed to stdout.
// This can be overriden with the limit clause though.
- rowsLimit = 10
+ rowsLimit = constants.DefaultMapReduceRowsLimit
}
if c.cumulative {
diff --git a/internal/clients/stats.go b/internal/clients/stats.go
index 9a17899..7160357 100644
--- a/internal/clients/stats.go
+++ b/internal/clients/stats.go
@@ -10,12 +10,13 @@ import (
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/protocol"
)
// Reusable timer to reduce allocations - PBO optimization
-var statsTimer = time.NewTimer(3 * time.Second)
+var statsTimer = time.NewTimer(constants.StatsTimerDuration)
// Used to collect and display various client stats.
type stats struct {
@@ -55,7 +56,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{},
default:
}
}
- statsTimer.Reset(3 * time.Second)
+ statsTimer.Reset(constants.StatsTimerDuration)
select {
case message := <-statsCh:
@@ -104,7 +105,7 @@ func (s *stats) printStatsDueInterrupt(messages []string) {
}
fmt.Println(fmt.Sprintf(" %s", message))
}
- time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS))
+ time.Sleep(time.Second * time.Duration(constants.InterruptTimeoutSeconds))
dlog.Client.Resume()
}
@@ -149,7 +150,7 @@ func (s *stats) numConnected() int {
func percentOf(total float64, value float64) float64 {
if total == 0 || total == value {
- return 100
+ return constants.PercentageMultiplier
}
- return value / (total / 100.0)
+ return value / (total / constants.PercentageMultiplier)
}
diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go
index 35c01d4..f71da41 100644
--- a/internal/clients/tailclient.go
+++ b/internal/clients/tailclient.go
@@ -1,50 +1,23 @@
package clients
import (
- "fmt"
- "runtime"
- "strings"
-
- "github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
)
// TailClient is used for tailing remote log files (opening, seeking to the end and returning only new incoming lines).
type TailClient struct {
- baseClient
+ CommonClient
}
// NewTailClient returns a new TailClient.
func NewTailClient(args config.Args) (*TailClient, error) {
args.Mode = omode.TailClient
c := TailClient{
- baseClient: baseClient{
- Args: args,
- throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()),
- retry: true,
- },
+ CommonClient: NewCommonClient(args, true),
}
c.init()
c.makeConnections(c)
return &c, nil
}
-
-func (c TailClient) makeHandler(server string) handlers.Handler {
- return handlers.NewClientHandler(server)
-}
-
-func (c TailClient) makeCommands() (commands []string) {
- regex, err := c.Regex.Serialize()
- if err != nil {
- dlog.Client.FatalPanic(err)
- }
- for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s:%s %s %s",
- c.Mode.String(), c.Args.SerializeOptions(), file, regex))
- }
- dlog.Client.Debug(commands)
- return
-}
diff --git a/internal/config/config.go b/internal/config/config.go
index ee23829..8cc6287 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -1,6 +1,9 @@
package config
-import "github.com/mimecast/dtail/internal/source"
+import (
+ "github.com/mimecast/dtail/internal/constants"
+ "github.com/mimecast/dtail/internal/source"
+)
const (
// HealthUser is used for the health check
@@ -10,11 +13,11 @@ const (
// ContinuousUser is used for non-interactive continuous mapreduce queries.
ContinuousUser string = "DTAIL-CONTINUOUS"
// InterruptTimeoutS specifies the Ctrl+C log pause interval.
- InterruptTimeoutS int = 3
+ InterruptTimeoutS int = constants.InterruptTimeoutSeconds
// DefaultConnectionsPerCPU controls how many connections are established concurrently.
- DefaultConnectionsPerCPU int = 10
+ DefaultConnectionsPerCPU int = constants.DefaultConnectionsPerCPU
// DefaultSSHPort is the default DServer port.
- DefaultSSHPort int = 2222
+ DefaultSSHPort int = constants.DefaultSSHPort
// DefaultLogLevel specifies the default log level (obviously)
DefaultLogLevel string = "info"
// DefaultClientLogger specifies the default logger for the client commands.
diff --git a/internal/config/initializer.go b/internal/config/initializer.go
index 9724902..825f612 100644
--- a/internal/config/initializer.go
+++ b/internal/config/initializer.go
@@ -8,6 +8,7 @@ import (
"os"
"strings"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/source"
)
@@ -83,7 +84,7 @@ func (in *initializer) transformConfig(sourceProcess source.Source, args *Args,
func (in *initializer) processEnvVars(args *Args) {
if Env("DTAIL_INTEGRATION_TEST_RUN_MODE") {
os.Setenv("DTAIL_HOSTNAME_OVERRIDE", "integrationtest")
- in.Server.MaxLineLength = 1024
+ in.Server.MaxLineLength = constants.DefaultMaxLineLength
}
sshPrivateKeyPathFile := os.Getenv("DTAIL_SSH_PRIVATE_KEYFILE_PATH")
if len(sshPrivateKeyPathFile) > 0 && args.SSHPrivateKeyFilePath == "" {
diff --git a/internal/config/server.go b/internal/config/server.go
index cb9ca2b..38186fd 100644
--- a/internal/config/server.go
+++ b/internal/config/server.go
@@ -2,6 +2,8 @@ package config
import (
"errors"
+
+ "github.com/mimecast/dtail/internal/constants"
)
// Permissions map. Each SSH user has a list of permissions which log files it
@@ -74,13 +76,13 @@ func newDefaultServerConfig() *ServerConfig {
defaultPermissions := []string{"^/.*"}
defaultBindAddress := "0.0.0.0"
return &ServerConfig{
- HostKeyBits: 4096,
+ HostKeyBits: constants.HostKeyBits,
HostKeyFile: "./cache/ssh_host_key",
MapreduceLogFormat: "default",
- MaxConcurrentCats: 2,
- MaxConcurrentTails: 50,
- MaxConnections: 10,
- MaxLineLength: 1024 * 1024,
+ MaxConcurrentCats: constants.MaxConcurrentCats,
+ MaxConcurrentTails: constants.MaxConcurrentTails,
+ MaxConnections: constants.MaxConnections,
+ MaxLineLength: constants.ServerMaxLineLength,
SSHBindAddress: defaultBindAddress,
Permissions: Permissions{
Default: defaultPermissions,
diff --git a/internal/constants/buffers.go b/internal/constants/buffers.go
new file mode 100644
index 0000000..b80586a
--- /dev/null
+++ b/internal/constants/buffers.go
@@ -0,0 +1,19 @@
+package constants
+
+// Buffer size constants in bytes
+const (
+ // StatsArraySize is the size of arrays for tracking stats
+ StatsArraySize = 100
+
+ // LineBufferInitialCapacity is the initial capacity for line buffers (8KB)
+ LineBufferInitialCapacity = 8192
+
+ // ReadBufferSize is the size of read buffers (8KB)
+ ReadBufferSize = 8192
+
+ // DefaultChunkSize is the default chunk size for reading (64KB)
+ DefaultChunkSize = 64 * 1024
+
+ // InitialBufferSize is the initial buffer size for processors (64KB)
+ InitialBufferSize = 64 * 1024
+) \ No newline at end of file
diff --git a/internal/constants/channels.go b/internal/constants/channels.go
new file mode 100644
index 0000000..9f2d619
--- /dev/null
+++ b/internal/constants/channels.go
@@ -0,0 +1,14 @@
+package constants
+
+// Channel buffer size constants
+const (
+ // DefaultLinesChannelSize is the default buffer size for lines channels
+ DefaultLinesChannelSize = 100
+
+ // DefaultServerMessagesChannelSize is the default buffer size for server messages
+ DefaultServerMessagesChannelSize = 10
+
+ // LoggerBufferChannelSize is the buffer size for logger channels
+ // Calculated as runtime.NumCPU() * 100 at runtime
+ LoggerBufferChannelMultiplier = 100
+) \ No newline at end of file
diff --git a/internal/constants/limits.go b/internal/constants/limits.go
new file mode 100644
index 0000000..5b871ce
--- /dev/null
+++ b/internal/constants/limits.go
@@ -0,0 +1,55 @@
+package constants
+
+// Numeric limits and configuration values
+const (
+ // MaxFlushRetries is the maximum number of flush retry attempts
+ MaxFlushRetries = 10
+
+ // MaxReadCommandRetries is the maximum number of read command retries
+ MaxReadCommandRetries = 5
+
+ // DefaultConnectionsPerCPU is the default number of connections per CPU
+ DefaultConnectionsPerCPU = 10
+
+ // DefaultSSHPort is the default SSH server port
+ DefaultSSHPort = 2222
+
+ // InterruptTimeoutSeconds is the timeout for interrupt handling
+ InterruptTimeoutSeconds = 3
+
+ // MaxConcurrentCats is the maximum number of concurrent cat operations
+ MaxConcurrentCats = 2
+
+ // MaxConcurrentTails is the maximum number of concurrent tail operations
+ MaxConcurrentTails = 50
+
+ // MaxConnections is the maximum total number of connections
+ MaxConnections = 10
+
+ // HostKeyBits is the number of bits for SSH host keys
+ HostKeyBits = 4096
+
+ // DefaultMaxLineLength is the default maximum line length (1KB)
+ DefaultMaxLineLength = 1024
+
+ // ServerMaxLineLength is the server maximum line length (1MB)
+ ServerMaxLineLength = 1024 * 1024
+
+ // MaxSymlinkDepth is the maximum depth for following symlinks
+ MaxSymlinkDepth = 100
+
+ // DefaultMapReduceRowsLimit is the default limit for MapReduce output rows
+ DefaultMapReduceRowsLimit = 10
+
+ // MapReduceUnlimited indicates no limit on MapReduce rows
+ MapReduceUnlimited = -1
+
+ // HealthOKStatus is the status code for healthy service
+ HealthOKStatus = 0
+
+ // HealthCriticalStatus is the status code for critical health issues
+ HealthCriticalStatus = 2
+
+ // PercentageMultiplier is used for percentage calculations
+ PercentageMultiplier = 100.0
+) \ No newline at end of file
diff --git a/internal/constants/timeouts.go b/internal/constants/timeouts.go
new file mode 100644
index 0000000..9373b23
--- /dev/null
+++ b/internal/constants/timeouts.go
@@ -0,0 +1,60 @@
+package constants
+
+import "time"
+
+// Timeout constants used throughout the application
+const (
+ // ReadTimeout is the timeout for read operations
+ ReadTimeout = 1 * time.Second
+
+ // WriteTimeout is the timeout for write operations
+ WriteTimeout = 10 * time.Second
+
+ // HandlerTimeout is the timeout for handler operations
+ HandlerTimeout = 5 * time.Second
+
+ // SSHConnectionTimeout is the timeout for SSH connection attempts
+ SSHConnectionTimeout = 30 * time.Second
+
+ // ReconnectSleepDuration is how long to wait before reconnecting
+ ReconnectSleepDuration = 2 * time.Second
+
+ // StatsTimerDuration is the interval for client stats reporting
+ StatsTimerDuration = 3 * time.Second
+
+ // ServerStatsTimerDuration is the interval for server stats reporting
+ ServerStatsTimerDuration = 10 * time.Second
+
+ // DefaultMapReduceInterval is the default interval for MapReduce operations
+ DefaultMapReduceInterval = 5 * time.Second
+
+ // ProcessorSleepDuration is the sleep duration for processors
+ ProcessorSleepDuration = 10 * time.Millisecond
+
+ // ProcessorTimeoutDuration is the timeout for processor operations
+ ProcessorTimeoutDuration = 100 * time.Millisecond
+
+ // MapReduceSleepDuration is the sleep duration in MapReduce aggregation
+ MapReduceSleepDuration = 100 * time.Millisecond
+
+ // ContinuousJobsStartDelay is the delay before starting continuous jobs
+ ContinuousJobsStartDelay = 2 * time.Second
+
+ // SchedulerStartDelay is the delay before starting the scheduler
+ SchedulerStartDelay = 2 * time.Second
+
+ // RetryTimerDuration is the duration for retry operations
+ RetryTimerDuration = 2 * time.Second
+
+ // SSHDialTimeout is the timeout for SSH dial operations
+ SSHDialTimeout = 2 * time.Second
+
+ // KnownHostsCallbackTimeout is the timeout for known hosts callback
+ KnownHostsCallbackTimeout = 2 * time.Second
+
+ // ReadCommandRetryInterval is the interval between read command retries
+ ReadCommandRetryInterval = 5 * time.Second
+
+ // DayDuration represents 24 hours for date calculations
+ DayDuration = 24 * time.Hour
+) \ No newline at end of file
diff --git a/internal/io/dlog/loggers/file.go b/internal/io/dlog/loggers/file.go
index 6a09353..b183843 100644
--- a/internal/io/dlog/loggers/file.go
+++ b/internal/io/dlog/loggers/file.go
@@ -10,6 +10,7 @@ import (
"time"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/constants"
)
type fileWriter struct{}
@@ -36,7 +37,7 @@ type file struct {
func newFile(strategy Strategy) *file {
return &file{
- bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*100),
+ bufferCh: make(chan *fileMessageBuf, runtime.NumCPU()*constants.LoggerBufferChannelMultiplier),
pauseCh: make(chan struct{}),
resumeCh: make(chan struct{}),
rotateCh: make(chan struct{}),
diff --git a/internal/io/fs/aggregateprocessor.go b/internal/io/fs/aggregateprocessor.go
index 98d0c31..809f298 100644
--- a/internal/io/fs/aggregateprocessor.go
+++ b/internal/io/fs/aggregateprocessor.go
@@ -5,6 +5,7 @@ import (
"context"
"time"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
@@ -69,7 +70,7 @@ func (p *AggregateLineProcessor) Flush() []byte {
if !p.isTailing {
// Close the lines channel to signal end of input
// Add a small delay to ensure all lines are processed before closing
- time.Sleep(10 * time.Millisecond)
+ time.Sleep(constants.ProcessorSleepDuration)
close(p.linesCh)
}
return nil
diff --git a/internal/io/fs/chunkedreader.go b/internal/io/fs/chunkedreader.go
index ab78ba1..7775a58 100644
--- a/internal/io/fs/chunkedreader.go
+++ b/internal/io/fs/chunkedreader.go
@@ -6,12 +6,13 @@ import (
"io"
"time"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/pool"
)
// Reusable timer to reduce allocations - PBO optimization
-var sharedTimer = time.NewTimer(10 * time.Millisecond)
+var sharedTimer = time.NewTimer(constants.ProcessorSleepDuration)
// ChunkedReader reads data in large chunks and processes it line by line
// This replaces the byte-by-byte reading approach for better performance
@@ -29,14 +30,14 @@ type ChunkedReader struct {
// NewChunkedReader creates a new chunked reader with the specified chunk size
func NewChunkedReader(reader io.Reader, chunkSize int) *ChunkedReader {
if chunkSize <= 0 {
- chunkSize = 64 * 1024 // Default 64KB chunks
+ chunkSize = constants.DefaultChunkSize // Default 64KB chunks
}
return &ChunkedReader{
reader: reader,
buffer: make([]byte, chunkSize),
chunkSize: chunkSize,
// PBO optimization: Pre-allocate line buffer
- lineBuffer: make([]byte, 0, 8192), // 8KB initial capacity
+ lineBuffer: make([]byte, 0, constants.LineBufferInitialCapacity), // 8KB initial capacity
}
}
@@ -76,7 +77,7 @@ func (cr *ChunkedReader) ProcessLines(ctx context.Context, rawLines chan *bytes.
default:
}
}
- sharedTimer.Reset(10 * time.Millisecond)
+ sharedTimer.Reset(constants.ProcessorSleepDuration)
select {
case <-ctx.Done():
return ctx.Err()
diff --git a/internal/io/fs/directprocessor.go b/internal/io/fs/directprocessor.go
index 84b78bb..312ac9c 100644
--- a/internal/io/fs/directprocessor.go
+++ b/internal/io/fs/directprocessor.go
@@ -9,6 +9,7 @@ import (
"time"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/lcontext"
)
@@ -77,7 +78,7 @@ func (dp *DirectProcessor) ProcessReader(ctx context.Context, reader io.Reader,
// Set buffer size respecting MaxLineLength configuration
maxLineLength := config.Server.MaxLineLength
- initialBufSize := 64 * 1024
+ initialBufSize := constants.InitialBufferSize
if maxLineLength < initialBufSize {
initialBufSize = maxLineLength
}
@@ -337,7 +338,7 @@ func (dp *DirectProcessor) followFile(ctx context.Context, filePath string) erro
select {
case <-ctx.Done():
return ctx.Err()
- case <-time.After(100 * time.Millisecond):
+ case <-time.After(constants.ProcessorTimeoutDuration):
// Check if file has grown
fileInfo, err := os.Stat(filePath)
if err != nil {
diff --git a/internal/io/fs/stats.go b/internal/io/fs/stats.go
index 4121ff7..1990e53 100644
--- a/internal/io/fs/stats.go
+++ b/internal/io/fs/stats.go
@@ -1,14 +1,16 @@
package fs
+import "github.com/mimecast/dtail/internal/constants"
+
// Used to calculate how many log lines matched the regular expression
// and how many log files could be transmitted from the server to the client.
// Hit and transmit percentage takes only the last 100 log lines into calculation.
type stats struct {
pos int
lineCount uint64
- matched [100]bool
+ matched [constants.StatsArraySize]bool
matchCount uint64
- transmitted [100]bool
+ transmitted [constants.StatsArraySize]bool
transmitCount int
}
@@ -25,7 +27,7 @@ func (f *stats) transmittedPerc() int {
// Update bucket position. We only take into consideration the last 100
// lines for stats.
func (f *stats) updatePosition() {
- f.pos = (f.pos + 1) % 100
+ f.pos = (f.pos + 1) % constants.StatsArraySize
f.lineCount++
}
@@ -63,7 +65,7 @@ func (f *stats) updateLineNotTransmitted() {
func percentOf(total float64, value float64) float64 {
if total == 0 || total == value {
- return 100
+ return constants.PercentageMultiplier
}
- return value / (total / 100.0)
+ return value / (total / constants.PercentageMultiplier)
}
diff --git a/internal/io/fs/tailprocessor.go b/internal/io/fs/tailprocessor.go
index 3bc9029..55934ce 100644
--- a/internal/io/fs/tailprocessor.go
+++ b/internal/io/fs/tailprocessor.go
@@ -7,6 +7,7 @@ import (
"time"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
)
@@ -163,7 +164,7 @@ func (ftp *FollowingTailProcessor) followFile(ctx context.Context, filePath stri
func (ftp *FollowingTailProcessor) followReader(ctx context.Context, file *os.File, filePath string) error {
// Set buffer size respecting MaxLineLength configuration
maxLineLength := config.Server.MaxLineLength
- initialBufSize := 64 * 1024
+ initialBufSize := constants.InitialBufferSize
if maxLineLength < initialBufSize {
initialBufSize = maxLineLength
}
@@ -259,7 +260,7 @@ func (ftp *FollowingTailProcessor) followReader(ctx context.Context, file *os.Fi
select {
case <-ctx.Done():
return ctx.Err()
- case <-time.After(100 * time.Millisecond):
+ case <-time.After(constants.ProcessorTimeoutDuration):
// Continue the loop to check for new content
}
}
diff --git a/internal/mapr/query.go b/internal/mapr/query.go
index ddcbc90..dc1a1c2 100644
--- a/internal/mapr/query.go
+++ b/internal/mapr/query.go
@@ -6,6 +6,8 @@ import (
"strconv"
"strings"
"time"
+
+ "github.com/mimecast/dtail/internal/constants"
)
const (
@@ -70,8 +72,8 @@ func NewQuery(queryStr string) (*Query, error) {
q := Query{
RawQuery: queryStr,
tokens: tokens,
- Interval: time.Second * 5,
- Limit: -1,
+ Interval: constants.DefaultMapReduceInterval,
+ Limit: constants.MapReduceUnlimited,
}
// If log format is CSV, then use "." as the table. It means, that
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 4f14751..8fabfc9 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -7,6 +7,7 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/mapr"
@@ -167,7 +168,7 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
if noMoreChannels {
return
}
- time.Sleep(time.Millisecond * 100)
+ time.Sleep(constants.MapReduceSleepDuration)
continue
}
diff --git a/internal/server/continuous.go b/internal/server/continuous.go
index ac5c686..0a218d4 100644
--- a/internal/server/continuous.go
+++ b/internal/server/continuous.go
@@ -8,6 +8,7 @@ import (
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
gossh "golang.org/x/crypto/ssh"
@@ -21,7 +22,7 @@ func newContinuous() *continuous {
func (c *continuous) start(ctx context.Context) {
dlog.Server.Info("Starting continuous job runner after 2s")
- time.Sleep(time.Second * 2)
+ time.Sleep(constants.ContinuousJobsStartDelay)
c.runJobs(ctx)
}
diff --git a/internal/server/filldates.go b/internal/server/filldates.go
index a6b0ba8..e7489ca 100644
--- a/internal/server/filldates.go
+++ b/internal/server/filldates.go
@@ -3,21 +3,23 @@ package server
import (
"strings"
"time"
+
+ "github.com/mimecast/dtail/internal/constants"
)
func fillDates(str string) string {
- yyyesterday := time.Now().Add(3 * -24 * time.Hour).Format("20060102")
+ yyyesterday := time.Now().Add(-3 * constants.DayDuration).Format("20060102")
str = strings.ReplaceAll(str, "$yyyesterday", yyyesterday)
- yyesterday := time.Now().Add(2 * -24 * time.Hour).Format("20060102")
+ yyesterday := time.Now().Add(-2 * constants.DayDuration).Format("20060102")
str = strings.ReplaceAll(str, "$yyesterday", yyesterday)
- yesterday := time.Now().Add(1 * -24 * time.Hour).Format("20060102")
+ yesterday := time.Now().Add(-1 * constants.DayDuration).Format("20060102")
str = strings.ReplaceAll(str, "$yesterday", yesterday)
today := time.Now().Format("20060102")
str = strings.ReplaceAll(str, "$today", today)
- tomorrow := time.Now().Add(1 * 24 * time.Hour).Format("20060102")
+ tomorrow := time.Now().Add(1 * constants.DayDuration).Format("20060102")
return strings.ReplaceAll(str, "$tomorrow", tomorrow)
}
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index e8ce19a..19096bf 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -15,6 +15,7 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
@@ -117,7 +118,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
pool.RecycleBytesBuffer(line.Content)
line.Recycle()
- case <-time.After(time.Second):
+ case <-time.After(constants.ReadTimeout):
select {
case <-h.done.Done():
err = io.EOF
@@ -288,13 +289,13 @@ func (h *baseHandler) flush() {
numUnsentMessages := func() int {
return len(h.lines) + len(h.serverMessages) + len(h.maprMessages)
}
- for i := 0; i < 10; i++ {
+ for i := 0; i < constants.MaxFlushRetries; i++ {
if numUnsentMessages() == 0 {
dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h))
return
}
dlog.Server.Debug(h.user, "Still lines to be sent")
- time.Sleep(time.Millisecond * 10)
+ time.Sleep(constants.ProcessorSleepDuration)
}
dlog.Server.Warn(h.user, "Some lines remain unsent", numUnsentMessages())
}
@@ -312,7 +313,7 @@ func (h *baseHandler) shutdown() {
select {
case <-h.ackCloseReceived:
- case <-time.After(time.Second * 5):
+ case <-time.After(constants.HandlerTimeout):
dlog.Server.Debug(h.user, "Shutdown timeout reached, enforcing shutdown")
case <-h.done.Done():
}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index c75b9fc..f7568a5 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -8,6 +8,7 @@ import (
"strings"
"time"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
@@ -106,7 +107,7 @@ func (r *readCommand) start(ctx context.Context, ltx lcontext.LContext,
func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
glob string, re regex.Regex, retries int, queryStr string) {
- retryInterval := time.Second * 5
+ retryInterval := constants.ReadCommandRetryInterval
glob = filepath.Clean(glob)
for retryCount := 0; retryCount < retries; retryCount++ {
diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go
index cc51b7e..1f89ef5 100644
--- a/internal/server/scheduler.go
+++ b/internal/server/scheduler.go
@@ -10,6 +10,7 @@ import (
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
@@ -24,8 +25,8 @@ func newScheduler() *scheduler {
func (s *scheduler) start(ctx context.Context) {
dlog.Server.Info("Starting scheduled job runner after 2s")
- // First run after just 10s!
- time.Sleep(time.Second * 2)
+ // First run after just 2s!
+ time.Sleep(constants.SchedulerStartDelay)
s.runJobs(ctx)
for {
select {
diff --git a/internal/server/stats.go b/internal/server/stats.go
index 99a644a..cdf3f03 100644
--- a/internal/server/stats.go
+++ b/internal/server/stats.go
@@ -7,6 +7,7 @@ import (
"time"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
)
@@ -67,7 +68,7 @@ func (s *stats) serverLimitExceeded() error {
func (s *stats) start(ctx context.Context) {
for {
select {
- case <-time.NewTimer(time.Second * 10).C:
+ case <-time.NewTimer(constants.ServerStatsTimerDuration).C:
s.logServerStats()
case <-ctx.Done():
return
@@ -78,7 +79,7 @@ func (s *stats) start(ctx context.Context) {
func (s *stats) waitForConnections() {
for {
select {
- case <-time.NewTimer(time.Second).C:
+ case <-time.NewTimer(constants.ReadTimeout).C:
if !s.hasConnections() {
return
}
diff --git a/internal/ssh/client/knownhostscallback.go b/internal/ssh/client/knownhostscallback.go
index 393c4c7..d5f605f 100644
--- a/internal/ssh/client/knownhostscallback.go
+++ b/internal/ssh/client/knownhostscallback.go
@@ -10,6 +10,7 @@ import (
"sync"
"time"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/prompt"
@@ -125,7 +126,7 @@ func (c KnownHostsCallback) PromptAddHosts(ctx context.Context) {
c.promptAddHosts(hosts)
hosts = []unknownHost{}
}
- case <-time.After(2 * time.Second):
+ case <-time.After(constants.KnownHostsCallbackTimeout):
// Or ask when after 2 seconds no new unknown hosts were added.
if len(hosts) > 0 {
c.promptAddHosts(hosts)
diff --git a/internal/user/server/user.go b/internal/user/server/user.go
index abf74f3..2b2d720 100644
--- a/internal/user/server/user.go
+++ b/internal/user/server/user.go
@@ -8,11 +8,12 @@ import (
"strings"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/constants"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/fs/permissions"
)
-const maxLinkDepth int = 100
+const maxLinkDepth int = constants.MaxSymlinkDepth
// User represents an end-user which connected to the server via the DTail client.
type User struct {