summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 08:55:50 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 08:55:50 +0200
commit50a40f6e77e9f9a6f65e0596c789f67b91f6a6e1 (patch)
treea1eff17c9d0d8afbf7eb55e9f2593c2647c5b62d /internal
parentbbbb7461d19e611e6fab3f24edd5f8e0d2d45b1e (diff)
Refactor server path to use injected runtime config (task 329)
Diffstat (limited to 'internal')
-rw-r--r--internal/clients/connectors/serverless.go23
-rw-r--r--internal/config/runtime.go18
-rw-r--r--internal/server/continuous.go12
-rw-r--r--internal/server/handlers/mapcommand.go9
-rw-r--r--internal/server/handlers/readcommand.go43
-rw-r--r--internal/server/handlers/serverhandler.go10
-rw-r--r--internal/server/scheduler.go12
-rw-r--r--internal/server/server.go31
-rw-r--r--internal/server/stats.go12
9 files changed, 105 insertions, 65 deletions
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
index eaaa770..74cd9e6 100644
--- a/internal/clients/connectors/serverless.go
+++ b/internal/clients/connectors/serverless.go
@@ -75,6 +75,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
user,
make(chan struct{}, config.Server.MaxConcurrentCats),
make(chan struct{}, config.Server.MaxConcurrentTails),
+ config.Server,
)
}
@@ -86,14 +87,14 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
// Use buffered channels to prevent deadlock
// This approach avoids the circular dependency of direct io.Copy
-
+
// Channels for data flow
toServer := make(chan []byte, 100)
fromServer := make(chan []byte, 100)
-
+
// Error tracking
errChan := make(chan error, 4)
-
+
// Read from client handler
go func() {
defer close(toServer)
@@ -117,7 +118,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
}
}
}()
-
+
// Write to server handler
go func() {
for data := range toServer {
@@ -127,7 +128,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
}
}
}()
-
+
// Read from server handler
go func() {
defer close(fromServer)
@@ -151,7 +152,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
}
}
}()
-
+
// Write to client handler
serverDone := make(chan struct{})
go func() {
@@ -163,7 +164,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
}
}
}()
-
+
// Send commands after setting up the data flow
for _, command := range s.commands {
dlog.Client.Debug("Sending command to serverless server", command)
@@ -171,7 +172,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
dlog.Client.Debug(err)
}
}
-
+
// Monitor for completion
go func() {
defer terminate()
@@ -184,17 +185,17 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
dlog.Client.Trace("<-ctx.Done()")
}
}()
-
+
// Wait for completion
<-ctx.Done()
-
+
// Check for errors
select {
case err := <-errChan:
return err
default:
}
-
+
s.handler.Shutdown()
return nil
}
diff --git a/internal/config/runtime.go b/internal/config/runtime.go
new file mode 100644
index 0000000..1e19265
--- /dev/null
+++ b/internal/config/runtime.go
@@ -0,0 +1,18 @@
+package config
+
+// RuntimeConfig contains the active runtime configuration for a process.
+// It is intended to be injected into components instead of relying on package globals.
+type RuntimeConfig struct {
+ Client *ClientConfig
+ Server *ServerConfig
+ Common *CommonConfig
+}
+
+// CurrentRuntime returns the currently initialized runtime configuration.
+func CurrentRuntime() RuntimeConfig {
+ return RuntimeConfig{
+ Client: Client,
+ Server: Server,
+ Common: Common,
+ }
+}
diff --git a/internal/server/continuous.go b/internal/server/continuous.go
index ac5c686..90e5f8a 100644
--- a/internal/server/continuous.go
+++ b/internal/server/continuous.go
@@ -13,10 +13,12 @@ import (
gossh "golang.org/x/crypto/ssh"
)
-type continuous struct{}
+type continuous struct {
+ cfg config.RuntimeConfig
+}
-func newContinuous() *continuous {
- return &continuous{}
+func newContinuous(cfg config.RuntimeConfig) *continuous {
+ return &continuous{cfg: cfg}
}
func (c *continuous) start(ctx context.Context) {
@@ -26,7 +28,7 @@ func (c *continuous) start(ctx context.Context) {
}
func (c *continuous) runJobs(ctx context.Context) {
- for _, job := range config.Server.Continuous {
+ for _, job := range c.cfg.Server.Continuous {
if !job.Enable {
dlog.Server.Debug(job.Name, "Not running job as not enabled")
continue
@@ -53,7 +55,7 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) {
outfile := fillDates(job.Outfile)
servers := strings.Join(job.Servers, ",")
if servers == "" {
- servers = config.Server.SSHBindAddress
+ servers = c.cfg.Server.SSHBindAddress
}
args := config.Args{
diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go
index a4fda97..36d9ef3 100644
--- a/internal/server/handlers/mapcommand.go
+++ b/internal/server/handlers/mapcommand.go
@@ -4,7 +4,6 @@ import (
"context"
"strings"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr/server"
)
@@ -22,11 +21,11 @@ func newMapCommand(serverHandler *ServerHandler, argc int,
m := mapCommand{server: serverHandler}
queryStr := strings.Join(args[1:], " ")
-
+
// If turbo boost is not disabled AND we're in server mode (not serverless), create a TurboAggregate
// Turbo boost is enabled by default and is a server-side optimization
- dlog.Server.Debug("MapReduce mode check", "turboBoostDisable", config.Server.TurboBoostDisable, "serverless", serverHandler.serverless)
- if !config.Server.TurboBoostDisable && !serverHandler.serverless {
+ dlog.Server.Debug("MapReduce mode check", "turboBoostDisable", serverHandler.serverCfg.TurboBoostDisable, "serverless", serverHandler.serverless)
+ if !serverHandler.serverCfg.TurboBoostDisable && !serverHandler.serverless {
dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr)
turboAggregate, err := server.NewTurboAggregate(queryStr)
if err != nil {
@@ -35,7 +34,7 @@ func newMapCommand(serverHandler *ServerHandler, argc int,
m.turboAggregate = turboAggregate
return m, nil, turboAggregate, nil
}
-
+
// Otherwise, create a regular Aggregate
aggregate, err := server.NewAggregate(queryStr)
if err != nil {
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 3294bdd..0375807 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -10,7 +10,6 @@ import (
"sync/atomic"
"time"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
@@ -107,30 +106,30 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
dlog.Server.Info(r.server.user, "Processing files", "count", len(paths), "glob", glob)
-
+
// Track pending files for this batch
atomic.AddInt32(&r.server.pendingFiles, int32(len(paths)))
dlog.Server.Info(r.server.user, "Added pending files", "count", len(paths), "totalPending", atomic.LoadInt32(&r.server.pendingFiles))
-
+
var wg sync.WaitGroup
wg.Add(len(paths))
for _, path := range paths {
go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re)
}
wg.Wait()
-
+
dlog.Server.Info(r.server.user, "All files processed", "count", len(paths))
// In turbo mode, signal EOF after all files are processed
// This is crucial for proper shutdown in server mode
- if !config.Server.TurboBoostDisable && r.server.aggregate == nil &&
+ if !r.server.serverCfg.TurboBoostDisable && r.server.aggregate == nil &&
(r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
if r.server.IsTurboMode() && r.server.turboEOF != nil {
dlog.Server.Debug(r.server.user, "Turbo mode: flushing data before EOF signal")
-
+
// Ensure all turbo data is flushed before signaling EOF
r.server.flushTurboData()
-
+
// Signal EOF by closing the channel, but only if it hasn't been closed yet
select {
case <-r.server.turboEOF:
@@ -138,7 +137,7 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
default:
close(r.server.turboEOF)
}
-
+
// Wait to ensure all data is transmitted
// This is especially important when files are queued due to concurrency limits
// In serverless mode, data is written directly to stdout, so no wait is needed
@@ -170,7 +169,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
// Decrement pending files counter when this file is done
remaining := atomic.AddInt32(&r.server.pendingFiles, -1)
dlog.Server.Debug(r.server.user, "File processing complete", "path", path, "remainingPending", remaining)
-
+
// Check if we should trigger shutdown now
// Only shutdown if no files are pending AND no commands are active
if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 {
@@ -185,7 +184,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
time.Sleep(500 * time.Millisecond)
}
}
-
+
// Double-check that we really have no pending work
// In turbo mode, there might be a race condition
// In serverless mode, no need for this delay
@@ -202,7 +201,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
}
}
}()
-
+
globID := r.makeGlobID(path, glob)
if !r.server.user.HasFilePermission(path, "readfiles") {
dlog.Server.Error(r.server.user, "No permission to read file", path, globID)
@@ -217,7 +216,7 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex) {
dlog.Server.Info(r.server.user, "Start reading", path, globID)
-
+
// Log if grep is using literal mode optimization
if r.mode == omode.GrepClient {
if re.IsLiteral() {
@@ -226,7 +225,7 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
dlog.Server.Info(r.server.user, "Using regex matching for pattern:", re.Pattern())
}
}
-
+
var reader fs.FileReader
var limiter chan struct{}
@@ -270,12 +269,12 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
// Check if we should use the turbo boost optimizations
// Enable turbo boost for cat/grep/tail modes, and now also for MapReduce operations
// MapReduce now has a turbo mode implementation that bypasses channels
- dlog.Server.Debug(r.server.user, "Checking turbo mode", "turboBoostDisable", config.Server.TurboBoostDisable,
+ dlog.Server.Debug(r.server.user, "Checking turbo mode", "turboBoostDisable", r.server.serverCfg.TurboBoostDisable,
"mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil, "hasAggregate", r.server.aggregate != nil)
// Only use turbo mode if:
// 1. Turbo boost is NOT disabled (it's enabled by default) AND
// 2. We have a turbo aggregate OR (we're in cat/grep/tail mode AND we don't have a regular aggregate)
- if !config.Server.TurboBoostDisable &&
+ if !r.server.serverCfg.TurboBoostDisable &&
(r.server.turboAggregate != nil || ((r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) && r.server.aggregate == nil)) {
dlog.Server.Info(r.server.user, "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil)
r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader)
@@ -296,7 +295,7 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
// For non-MapReduce operations, use the server's lines channel
lines = r.server.lines
}
-
+
if err := reader.Start(ctx, ltx, lines, re); err != nil {
dlog.Server.Error(r.server.user, path, globID, err)
}
@@ -322,7 +321,7 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
path, globID string, re regex.Regex, reader fs.FileReader) {
dlog.Server.Info(r.server.user, "Using channel-less grep implementation", path, globID)
-
+
// Log if grep is using literal mode optimization
if r.mode == omode.GrepClient {
if re.IsLiteral() {
@@ -337,7 +336,7 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
var lines chan *line.Line
// Use the optimized version if turbo boost is not disabled (enabled by default)
- turboBoostEnabled := !config.Server.TurboBoostDisable
+ turboBoostEnabled := !r.server.serverCfg.TurboBoostDisable
for {
if aggregate != nil {
@@ -387,7 +386,7 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L
path, globID string, re regex.Regex, reader fs.FileReader) {
dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID)
-
+
// Log if grep is using literal mode optimization
if r.mode == omode.GrepClient {
if re.IsLiteral() {
@@ -422,14 +421,14 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L
for {
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration")
-
+
// Create a processor based on whether we're doing MapReduce or not
var processor interface {
ProcessLine(*bytes.Buffer, uint64, string) error
Flush() error
Close() error
}
-
+
if r.server.turboAggregate != nil {
// Use turbo aggregate processor for MapReduce operations
dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID)
@@ -457,7 +456,7 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> closing processor")
processor.Close()
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> processor closed")
-
+
// Give time for data to be transmitted
// This is crucial for integration tests to ensure all data is sent
// Skip this delay in serverless mode since data is written directly to stdout
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 645e2e9..92619d7 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -21,6 +21,7 @@ type ServerHandler struct {
baseHandler
catLimiter chan struct{}
tailLimiter chan struct{}
+ serverCfg *config.ServerConfig
regex string
// Track pending files waiting for limiter slots
pendingFiles int32
@@ -30,9 +31,13 @@ var _ Handler = (*ServerHandler)(nil)
// NewServerHandler returns the server handler.
func NewServerHandler(user *user.User, catLimiter,
- tailLimiter chan struct{}) *ServerHandler {
+ tailLimiter chan struct{}, serverCfg *config.ServerConfig) *ServerHandler {
dlog.Server.Debug(user, "Creating new server handler")
+ if serverCfg == nil {
+ dlog.Server.FatalPanic("Missing server config in NewServerHandler")
+ }
+
h := ServerHandler{
baseHandler: baseHandler{
done: internal.NewDone(),
@@ -44,6 +49,7 @@ func NewServerHandler(user *user.User, catLimiter,
},
catLimiter: catLimiter,
tailLimiter: tailLimiter,
+ serverCfg: serverCfg,
regex: ".",
}
h.handleCommandCb = h.handleUserCommand
@@ -68,7 +74,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon
activeCommands := h.decrementActiveCommands()
pendingFiles := atomic.LoadInt32(&h.pendingFiles)
dlog.Server.Debug(h.user, "Command finished", "activeCommands", activeCommands, "pendingFiles", pendingFiles)
-
+
// Only shutdown if no active commands AND no pending files
if activeCommands == 0 && pendingFiles == 0 {
h.shutdown()
diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go
index cc51b7e..b8ccd9b 100644
--- a/internal/server/scheduler.go
+++ b/internal/server/scheduler.go
@@ -16,10 +16,12 @@ import (
gossh "golang.org/x/crypto/ssh"
)
-type scheduler struct{}
+type scheduler struct {
+ cfg config.RuntimeConfig
+}
-func newScheduler() *scheduler {
- return &scheduler{}
+func newScheduler(cfg config.RuntimeConfig) *scheduler {
+ return &scheduler{cfg: cfg}
}
func (s *scheduler) start(ctx context.Context) {
@@ -38,7 +40,7 @@ func (s *scheduler) start(ctx context.Context) {
}
func (s *scheduler) runJobs(ctx context.Context) {
- for _, job := range config.Server.Schedule {
+ for _, job := range s.cfg.Server.Schedule {
if !job.Enable {
dlog.Server.Debug(job.Name, "Not running job as not enabled")
continue
@@ -68,7 +70,7 @@ func (s *scheduler) runJob(ctx context.Context, job config.Scheduled) {
servers := strings.Join(job.Servers, ",")
if servers == "" {
- servers = config.Server.SSHBindAddress
+ servers = s.cfg.Server.SSHBindAddress
}
args := config.Args{
ConnectionsPerCPU: config.DefaultConnectionsPerCPU,
diff --git a/internal/server/server.go b/internal/server/server.go
index 38b042f..b4c4406 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -20,6 +20,7 @@ import (
// Server is the main server data structure.
type Server struct {
+ cfg config.RuntimeConfig
// Various server statistics counters.
stats stats
// SSH server configuration.
@@ -35,21 +36,27 @@ type Server struct {
}
// New returns a new server.
-func New() *Server {
+func New(cfg config.RuntimeConfig) *Server {
+ if cfg.Server == nil || cfg.Common == nil {
+ dlog.Server.FatalPanic("Missing runtime server/common configuration")
+ }
+
dlog.Server.Info("Starting server", version.String())
s := Server{
+ cfg: cfg,
sshServerConfig: &gossh.ServerConfig{
Config: gossh.Config{
- KeyExchanges: config.Server.KeyExchanges,
- Ciphers: config.Server.Ciphers,
- MACs: config.Server.MACs,
+ KeyExchanges: cfg.Server.KeyExchanges,
+ Ciphers: cfg.Server.Ciphers,
+ MACs: cfg.Server.MACs,
},
},
- catLimiter: make(chan struct{}, config.Server.MaxConcurrentCats),
- tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails),
- sched: newScheduler(),
- cont: newContinuous(),
+ stats: newStats(cfg.Server.MaxConnections),
+ catLimiter: make(chan struct{}, cfg.Server.MaxConcurrentCats),
+ tailLimiter: make(chan struct{}, cfg.Server.MaxConcurrentTails),
+ sched: newScheduler(cfg),
+ cont: newContinuous(cfg),
}
s.sshServerConfig.PasswordCallback = s.Callback
@@ -67,7 +74,7 @@ func New() *Server {
// Start the server.
func (s *Server) Start(ctx context.Context) int {
dlog.Server.Info("Starting server")
- bindAt := fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort)
+ bindAt := fmt.Sprintf("%s:%d", s.cfg.Server.SSHBindAddress, s.cfg.Common.SSHPort)
dlog.Server.Info("Binding server", bindAt)
listener, err := net.Listen("tcp", bindAt)
@@ -193,7 +200,7 @@ func (s *Server) handleShellRequest(ctx context.Context, sshConn gossh.Conn,
case config.HealthUser:
handler = handlers.NewHealthHandler(user)
default:
- handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter)
+ handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.cfg.Server)
}
terminate := func() {
@@ -262,14 +269,14 @@ func (s *Server) Callback(c gossh.ConnMetadata,
return nil, nil
}
case config.ScheduleUser:
- for _, job := range config.Server.Schedule {
+ for _, job := range s.cfg.Server.Schedule {
if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) {
dlog.Server.Debug(user, "Granting SSH connection")
return nil, nil
}
}
case config.ContinuousUser:
- for _, job := range config.Server.Continuous {
+ for _, job := range s.cfg.Server.Continuous {
if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) {
dlog.Server.Debug(user, "Granting SSH connection")
return nil, nil
diff --git a/internal/server/stats.go b/internal/server/stats.go
index 99a644a..2509321 100644
--- a/internal/server/stats.go
+++ b/internal/server/stats.go
@@ -6,7 +6,6 @@ import (
"sync"
"time"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
)
@@ -15,6 +14,13 @@ type stats struct {
mutex sync.Mutex
currentConnections int
lifetimeConnections uint64
+ maxConnections int
+}
+
+func newStats(maxConnections int) stats {
+ return stats{
+ maxConnections: maxConnections,
+ }
}
func (s *stats) incrementConnections() {
@@ -57,9 +63,9 @@ func (s *stats) serverLimitExceeded() error {
s.mutex.Lock()
defer s.mutex.Unlock()
- if s.currentConnections >= config.Server.MaxConnections {
+ if s.currentConnections >= s.maxConnections {
return fmt.Errorf("Exceeded max allowed concurrent connections of %d",
- config.Server.MaxConnections)
+ s.maxConnections)
}
return nil
}