diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 08:55:50 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 08:55:50 +0200 |
| commit | 50a40f6e77e9f9a6f65e0596c789f67b91f6a6e1 (patch) | |
| tree | a1eff17c9d0d8afbf7eb55e9f2593c2647c5b62d /internal | |
| parent | bbbb7461d19e611e6fab3f24edd5f8e0d2d45b1e (diff) | |
Refactor server path to use injected runtime config (task 329)
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/clients/connectors/serverless.go | 23 | ||||
| -rw-r--r-- | internal/config/runtime.go | 18 | ||||
| -rw-r--r-- | internal/server/continuous.go | 12 | ||||
| -rw-r--r-- | internal/server/handlers/mapcommand.go | 9 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 43 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 10 | ||||
| -rw-r--r-- | internal/server/scheduler.go | 12 | ||||
| -rw-r--r-- | internal/server/server.go | 31 | ||||
| -rw-r--r-- | internal/server/stats.go | 12 |
9 files changed, 105 insertions, 65 deletions
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index eaaa770..74cd9e6 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -75,6 +75,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro user, make(chan struct{}, config.Server.MaxConcurrentCats), make(chan struct{}, config.Server.MaxConcurrentTails), + config.Server, ) } @@ -86,14 +87,14 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro // Use buffered channels to prevent deadlock // This approach avoids the circular dependency of direct io.Copy - + // Channels for data flow toServer := make(chan []byte, 100) fromServer := make(chan []byte, 100) - + // Error tracking errChan := make(chan error, 4) - + // Read from client handler go func() { defer close(toServer) @@ -117,7 +118,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro } } }() - + // Write to server handler go func() { for data := range toServer { @@ -127,7 +128,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro } } }() - + // Read from server handler go func() { defer close(fromServer) @@ -151,7 +152,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro } } }() - + // Write to client handler serverDone := make(chan struct{}) go func() { @@ -163,7 +164,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro } } }() - + // Send commands after setting up the data flow for _, command := range s.commands { dlog.Client.Debug("Sending command to serverless server", command) @@ -171,7 +172,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro dlog.Client.Debug(err) } } - + // Monitor for completion go func() { defer terminate() @@ -184,17 +185,17 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro dlog.Client.Trace("<-ctx.Done()") } }() - + // Wait for completion <-ctx.Done() - + // Check for errors select { case err := <-errChan: return err default: } - + s.handler.Shutdown() return nil } diff --git a/internal/config/runtime.go b/internal/config/runtime.go new file mode 100644 index 0000000..1e19265 --- /dev/null +++ b/internal/config/runtime.go @@ -0,0 +1,18 @@ +package config + +// RuntimeConfig contains the active runtime configuration for a process. +// It is intended to be injected into components instead of relying on package globals. +type RuntimeConfig struct { + Client *ClientConfig + Server *ServerConfig + Common *CommonConfig +} + +// CurrentRuntime returns the currently initialized runtime configuration. +func CurrentRuntime() RuntimeConfig { + return RuntimeConfig{ + Client: Client, + Server: Server, + Common: Common, + } +} diff --git a/internal/server/continuous.go b/internal/server/continuous.go index ac5c686..90e5f8a 100644 --- a/internal/server/continuous.go +++ b/internal/server/continuous.go @@ -13,10 +13,12 @@ import ( gossh "golang.org/x/crypto/ssh" ) -type continuous struct{} +type continuous struct { + cfg config.RuntimeConfig +} -func newContinuous() *continuous { - return &continuous{} +func newContinuous(cfg config.RuntimeConfig) *continuous { + return &continuous{cfg: cfg} } func (c *continuous) start(ctx context.Context) { @@ -26,7 +28,7 @@ func (c *continuous) start(ctx context.Context) { } func (c *continuous) runJobs(ctx context.Context) { - for _, job := range config.Server.Continuous { + for _, job := range c.cfg.Server.Continuous { if !job.Enable { dlog.Server.Debug(job.Name, "Not running job as not enabled") continue @@ -53,7 +55,7 @@ func (c *continuous) runJob(ctx context.Context, job config.Continuous) { outfile := fillDates(job.Outfile) servers := strings.Join(job.Servers, ",") if servers == "" { - servers = config.Server.SSHBindAddress + servers = c.cfg.Server.SSHBindAddress } args := config.Args{ diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go index a4fda97..36d9ef3 100644 --- a/internal/server/handlers/mapcommand.go +++ b/internal/server/handlers/mapcommand.go @@ -4,7 +4,6 @@ import ( "context" "strings" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr/server" ) @@ -22,11 +21,11 @@ func newMapCommand(serverHandler *ServerHandler, argc int, m := mapCommand{server: serverHandler} queryStr := strings.Join(args[1:], " ") - + // If turbo boost is not disabled AND we're in server mode (not serverless), create a TurboAggregate // Turbo boost is enabled by default and is a server-side optimization - dlog.Server.Debug("MapReduce mode check", "turboBoostDisable", config.Server.TurboBoostDisable, "serverless", serverHandler.serverless) - if !config.Server.TurboBoostDisable && !serverHandler.serverless { + dlog.Server.Debug("MapReduce mode check", "turboBoostDisable", serverHandler.serverCfg.TurboBoostDisable, "serverless", serverHandler.serverless) + if !serverHandler.serverCfg.TurboBoostDisable && !serverHandler.serverless { dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr) turboAggregate, err := server.NewTurboAggregate(queryStr) if err != nil { @@ -35,7 +34,7 @@ func newMapCommand(serverHandler *ServerHandler, argc int, m.turboAggregate = turboAggregate return m, nil, turboAggregate, nil } - + // Otherwise, create a regular Aggregate aggregate, err := server.NewAggregate(queryStr) if err != nil { diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 3294bdd..0375807 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -10,7 +10,6 @@ import ( "sync/atomic" "time" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" @@ -107,30 +106,30 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { dlog.Server.Info(r.server.user, "Processing files", "count", len(paths), "glob", glob) - + // Track pending files for this batch atomic.AddInt32(&r.server.pendingFiles, int32(len(paths))) dlog.Server.Info(r.server.user, "Added pending files", "count", len(paths), "totalPending", atomic.LoadInt32(&r.server.pendingFiles)) - + var wg sync.WaitGroup wg.Add(len(paths)) for _, path := range paths { go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re) } wg.Wait() - + dlog.Server.Info(r.server.user, "All files processed", "count", len(paths)) // In turbo mode, signal EOF after all files are processed // This is crucial for proper shutdown in server mode - if !config.Server.TurboBoostDisable && r.server.aggregate == nil && + if !r.server.serverCfg.TurboBoostDisable && r.server.aggregate == nil && (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) { if r.server.IsTurboMode() && r.server.turboEOF != nil { dlog.Server.Debug(r.server.user, "Turbo mode: flushing data before EOF signal") - + // Ensure all turbo data is flushed before signaling EOF r.server.flushTurboData() - + // Signal EOF by closing the channel, but only if it hasn't been closed yet select { case <-r.server.turboEOF: @@ -138,7 +137,7 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, default: close(r.server.turboEOF) } - + // Wait to ensure all data is transmitted // This is especially important when files are queued due to concurrency limits // In serverless mode, data is written directly to stdout, so no wait is needed @@ -170,7 +169,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC // Decrement pending files counter when this file is done remaining := atomic.AddInt32(&r.server.pendingFiles, -1) dlog.Server.Debug(r.server.user, "File processing complete", "path", path, "remainingPending", remaining) - + // Check if we should trigger shutdown now // Only shutdown if no files are pending AND no commands are active if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 { @@ -185,7 +184,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC time.Sleep(500 * time.Millisecond) } } - + // Double-check that we really have no pending work // In turbo mode, there might be a race condition // In serverless mode, no need for this delay @@ -202,7 +201,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC } } }() - + globID := r.makeGlobID(path, glob) if !r.server.user.HasFilePermission(path, "readfiles") { dlog.Server.Error(r.server.user, "No permission to read file", path, globID) @@ -217,7 +216,7 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, path, globID string, re regex.Regex) { dlog.Server.Info(r.server.user, "Start reading", path, globID) - + // Log if grep is using literal mode optimization if r.mode == omode.GrepClient { if re.IsLiteral() { @@ -226,7 +225,7 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, dlog.Server.Info(r.server.user, "Using regex matching for pattern:", re.Pattern()) } } - + var reader fs.FileReader var limiter chan struct{} @@ -270,12 +269,12 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, // Check if we should use the turbo boost optimizations // Enable turbo boost for cat/grep/tail modes, and now also for MapReduce operations // MapReduce now has a turbo mode implementation that bypasses channels - dlog.Server.Debug(r.server.user, "Checking turbo mode", "turboBoostDisable", config.Server.TurboBoostDisable, + dlog.Server.Debug(r.server.user, "Checking turbo mode", "turboBoostDisable", r.server.serverCfg.TurboBoostDisable, "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil, "hasAggregate", r.server.aggregate != nil) // Only use turbo mode if: // 1. Turbo boost is NOT disabled (it's enabled by default) AND // 2. We have a turbo aggregate OR (we're in cat/grep/tail mode AND we don't have a regular aggregate) - if !config.Server.TurboBoostDisable && + if !r.server.serverCfg.TurboBoostDisable && (r.server.turboAggregate != nil || ((r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) && r.server.aggregate == nil)) { dlog.Server.Info(r.server.user, "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil) r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader) @@ -296,7 +295,7 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, // For non-MapReduce operations, use the server's lines channel lines = r.server.lines } - + if err := reader.Start(ctx, ltx, lines, re); err != nil { dlog.Server.Error(r.server.user, path, globID, err) } @@ -322,7 +321,7 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte path, globID string, re regex.Regex, reader fs.FileReader) { dlog.Server.Info(r.server.user, "Using channel-less grep implementation", path, globID) - + // Log if grep is using literal mode optimization if r.mode == omode.GrepClient { if re.IsLiteral() { @@ -337,7 +336,7 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte var lines chan *line.Line // Use the optimized version if turbo boost is not disabled (enabled by default) - turboBoostEnabled := !config.Server.TurboBoostDisable + turboBoostEnabled := !r.server.serverCfg.TurboBoostDisable for { if aggregate != nil { @@ -387,7 +386,7 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L path, globID string, re regex.Regex, reader fs.FileReader) { dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID) - + // Log if grep is using literal mode optimization if r.mode == omode.GrepClient { if re.IsLiteral() { @@ -422,14 +421,14 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L for { dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration") - + // Create a processor based on whether we're doing MapReduce or not var processor interface { ProcessLine(*bytes.Buffer, uint64, string) error Flush() error Close() error } - + if r.server.turboAggregate != nil { // Use turbo aggregate processor for MapReduce operations dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID) @@ -457,7 +456,7 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> closing processor") processor.Close() dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> processor closed") - + // Give time for data to be transmitted // This is crucial for integration tests to ensure all data is sent // Skip this delay in serverless mode since data is written directly to stdout diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 645e2e9..92619d7 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -21,6 +21,7 @@ type ServerHandler struct { baseHandler catLimiter chan struct{} tailLimiter chan struct{} + serverCfg *config.ServerConfig regex string // Track pending files waiting for limiter slots pendingFiles int32 @@ -30,9 +31,13 @@ var _ Handler = (*ServerHandler)(nil) // NewServerHandler returns the server handler. func NewServerHandler(user *user.User, catLimiter, - tailLimiter chan struct{}) *ServerHandler { + tailLimiter chan struct{}, serverCfg *config.ServerConfig) *ServerHandler { dlog.Server.Debug(user, "Creating new server handler") + if serverCfg == nil { + dlog.Server.FatalPanic("Missing server config in NewServerHandler") + } + h := ServerHandler{ baseHandler: baseHandler{ done: internal.NewDone(), @@ -44,6 +49,7 @@ func NewServerHandler(user *user.User, catLimiter, }, catLimiter: catLimiter, tailLimiter: tailLimiter, + serverCfg: serverCfg, regex: ".", } h.handleCommandCb = h.handleUserCommand @@ -68,7 +74,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon activeCommands := h.decrementActiveCommands() pendingFiles := atomic.LoadInt32(&h.pendingFiles) dlog.Server.Debug(h.user, "Command finished", "activeCommands", activeCommands, "pendingFiles", pendingFiles) - + // Only shutdown if no active commands AND no pending files if activeCommands == 0 && pendingFiles == 0 { h.shutdown() diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go index cc51b7e..b8ccd9b 100644 --- a/internal/server/scheduler.go +++ b/internal/server/scheduler.go @@ -16,10 +16,12 @@ import ( gossh "golang.org/x/crypto/ssh" ) -type scheduler struct{} +type scheduler struct { + cfg config.RuntimeConfig +} -func newScheduler() *scheduler { - return &scheduler{} +func newScheduler(cfg config.RuntimeConfig) *scheduler { + return &scheduler{cfg: cfg} } func (s *scheduler) start(ctx context.Context) { @@ -38,7 +40,7 @@ func (s *scheduler) start(ctx context.Context) { } func (s *scheduler) runJobs(ctx context.Context) { - for _, job := range config.Server.Schedule { + for _, job := range s.cfg.Server.Schedule { if !job.Enable { dlog.Server.Debug(job.Name, "Not running job as not enabled") continue @@ -68,7 +70,7 @@ func (s *scheduler) runJob(ctx context.Context, job config.Scheduled) { servers := strings.Join(job.Servers, ",") if servers == "" { - servers = config.Server.SSHBindAddress + servers = s.cfg.Server.SSHBindAddress } args := config.Args{ ConnectionsPerCPU: config.DefaultConnectionsPerCPU, diff --git a/internal/server/server.go b/internal/server/server.go index 38b042f..b4c4406 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -20,6 +20,7 @@ import ( // Server is the main server data structure. type Server struct { + cfg config.RuntimeConfig // Various server statistics counters. stats stats // SSH server configuration. @@ -35,21 +36,27 @@ type Server struct { } // New returns a new server. -func New() *Server { +func New(cfg config.RuntimeConfig) *Server { + if cfg.Server == nil || cfg.Common == nil { + dlog.Server.FatalPanic("Missing runtime server/common configuration") + } + dlog.Server.Info("Starting server", version.String()) s := Server{ + cfg: cfg, sshServerConfig: &gossh.ServerConfig{ Config: gossh.Config{ - KeyExchanges: config.Server.KeyExchanges, - Ciphers: config.Server.Ciphers, - MACs: config.Server.MACs, + KeyExchanges: cfg.Server.KeyExchanges, + Ciphers: cfg.Server.Ciphers, + MACs: cfg.Server.MACs, }, }, - catLimiter: make(chan struct{}, config.Server.MaxConcurrentCats), - tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails), - sched: newScheduler(), - cont: newContinuous(), + stats: newStats(cfg.Server.MaxConnections), + catLimiter: make(chan struct{}, cfg.Server.MaxConcurrentCats), + tailLimiter: make(chan struct{}, cfg.Server.MaxConcurrentTails), + sched: newScheduler(cfg), + cont: newContinuous(cfg), } s.sshServerConfig.PasswordCallback = s.Callback @@ -67,7 +74,7 @@ func New() *Server { // Start the server. func (s *Server) Start(ctx context.Context) int { dlog.Server.Info("Starting server") - bindAt := fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort) + bindAt := fmt.Sprintf("%s:%d", s.cfg.Server.SSHBindAddress, s.cfg.Common.SSHPort) dlog.Server.Info("Binding server", bindAt) listener, err := net.Listen("tcp", bindAt) @@ -193,7 +200,7 @@ func (s *Server) handleShellRequest(ctx context.Context, sshConn gossh.Conn, case config.HealthUser: handler = handlers.NewHealthHandler(user) default: - handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter) + handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.cfg.Server) } terminate := func() { @@ -262,14 +269,14 @@ func (s *Server) Callback(c gossh.ConnMetadata, return nil, nil } case config.ScheduleUser: - for _, job := range config.Server.Schedule { + for _, job := range s.cfg.Server.Schedule { if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) { dlog.Server.Debug(user, "Granting SSH connection") return nil, nil } } case config.ContinuousUser: - for _, job := range config.Server.Continuous { + for _, job := range s.cfg.Server.Continuous { if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) { dlog.Server.Debug(user, "Granting SSH connection") return nil, nil diff --git a/internal/server/stats.go b/internal/server/stats.go index 99a644a..2509321 100644 --- a/internal/server/stats.go +++ b/internal/server/stats.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" ) @@ -15,6 +14,13 @@ type stats struct { mutex sync.Mutex currentConnections int lifetimeConnections uint64 + maxConnections int +} + +func newStats(maxConnections int) stats { + return stats{ + maxConnections: maxConnections, + } } func (s *stats) incrementConnections() { @@ -57,9 +63,9 @@ func (s *stats) serverLimitExceeded() error { s.mutex.Lock() defer s.mutex.Unlock() - if s.currentConnections >= config.Server.MaxConnections { + if s.currentConnections >= s.maxConnections { return fmt.Errorf("Exceeded max allowed concurrent connections of %d", - config.Server.MaxConnections) + s.maxConnections) } return nil } |
