// Package server provides the DTail server implementation that handles SSH // connections from DTail clients and processes distributed log operations. // The server acts as an SSH daemon listening on port 2222 by default, providing // secure multi-user access to log files with proper authentication and resource management. // // Key features: // - SSH server with configurable authentication methods // - Multi-user support with different privilege levels // - Resource management with configurable connection and operation limits // - Background services for scheduled and continuous monitoring jobs // - Handler routing system for different client operations // - Real-time statistics and connection tracking // // The server supports several user types: // - Regular users: Standard SSH public key authentication // - Health users: Special authentication for health checking // - Scheduled users: Background jobs with IP-based access control // - Continuous users: Long-running monitoring jobs with IP-based access control // // Each connection is handled in its own goroutine with proper resource cleanup // and statistics tracking. The server enforces connection limits to prevent // resource exhaustion and provides graceful shutdown capabilities. package server import ( "context" "errors" "fmt" "io" "net" "strings" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/server/handlers" "github.com/mimecast/dtail/internal/ssh/server" user "github.com/mimecast/dtail/internal/user/server" "github.com/mimecast/dtail/internal/version" gossh "golang.org/x/crypto/ssh" ) // Server represents the main DTail server instance that manages SSH connections, // user authentication, and distributed log operations. It coordinates multiple // subsystems including connection handling, resource limiting, and background services. type Server struct { // stats tracks real-time server statistics including connection counts, // active operations, and resource utilization metrics stats stats // sshServerConfig contains the SSH server configuration including // supported key exchanges, ciphers, MACs, and authentication callbacks sshServerConfig *gossh.ServerConfig // catLimiter controls the maximum number of concurrent cat operations // to prevent resource exhaustion from too many simultaneous file reads catLimiter chan struct{} // tailLimiter controls the maximum number of concurrent tail operations // to manage long-running file monitoring connections tailLimiter chan struct{} // sched manages scheduled MapReduce jobs that run at specified intervals // with configurable authentication and access control sched *scheduler // cont manages continuous monitoring jobs that watch log files for // specific patterns and trigger actions when matches are found cont *continuous } // New creates and initializes a new DTail server instance with all necessary // components configured. This constructor sets up SSH server configuration, // resource limiters, authentication callbacks, and background services. // // Returns: // *Server: Fully configured server instance ready to start // // The initialization process: // 1. Creates SSH server configuration with cryptographic settings // 2. Sets up resource limiters for concurrent operations // 3. Configures authentication callbacks for different user types // 4. Generates or loads SSH host keys // 5. Initializes background services (scheduler and continuous monitoring) // // The server is ready to call Start() after construction. func New() *Server { dlog.Server.Info("Starting server", version.String()) s := Server{ sshServerConfig: &gossh.ServerConfig{ Config: gossh.Config{ KeyExchanges: config.Server.KeyExchanges, Ciphers: config.Server.Ciphers, MACs: config.Server.MACs, }, }, catLimiter: make(chan struct{}, config.Server.MaxConcurrentCats), tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails), sched: newScheduler(), cont: newContinuous(), } s.sshServerConfig.PasswordCallback = s.Callback s.sshServerConfig.PublicKeyCallback = server.PublicKeyCallback private, err := gossh.ParsePrivateKey(server.PrivateHostKey()) if err != nil { dlog.Server.FatalPanic(err) } s.sshServerConfig.AddHostKey(private) return &s } // Start begins the server operation by binding to the configured address, // starting background services, and entering the main connection acceptance loop. // This method handles the complete server lifecycle including graceful shutdown. // // Parameters: // ctx: Context for controlling server shutdown and cancellation // // Returns: // int: Exit status code (currently always returns 0) // // The startup process: // 1. Binds to the configured SSH port and address // 2. Starts statistics collection in background // 3. Starts scheduled job processor // 4. Starts continuous monitoring processor // 5. Begins the main connection acceptance loop // 6. Blocks until context cancellation triggers shutdown func (s *Server) Start(ctx context.Context) int { dlog.Server.Info("Starting server") bindAt := fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort) dlog.Server.Info("Binding server", bindAt) listener, err := net.Listen("tcp", bindAt) if err != nil { dlog.Server.FatalPanic("Failed to open listening TCP socket", err) } go s.stats.start(ctx) go s.sched.start(ctx) go s.cont.start(ctx) go s.listenerLoop(ctx, listener) <-ctx.Done() // For future use. return 0 } func (s *Server) listenerLoop(ctx context.Context, listener net.Listener) { dlog.Server.Debug("Starting listener loop") for { conn, err := listener.Accept() // Blocking if err != nil { select { case <-ctx.Done(): return default: } dlog.Server.Error("Failed to accept incoming connection", err) continue } if err := s.stats.serverLimitExceeded(); err != nil { dlog.Server.Error(err) conn.Close() continue } go s.handleConnection(ctx, conn) } } func (s *Server) handleConnection(ctx context.Context, conn net.Conn) { dlog.Server.Info("Handling connection") sshConn, chans, reqs, err := gossh.NewServerConn(conn, s.sshServerConfig) if err != nil { dlog.Server.Error("Something just happened", err) return } s.stats.incrementConnections() go gossh.DiscardRequests(reqs) for newChannel := range chans { go s.handleChannel(ctx, sshConn, newChannel) } } func (s *Server) handleChannel(ctx context.Context, sshConn gossh.Conn, newChannel gossh.NewChannel) { user, err := user.New(sshConn.User(), sshConn.RemoteAddr().String()) if err != nil { dlog.Server.Error(user, err) newChannel.Reject(gossh.Prohibited, err.Error()) return } dlog.Server.Info(user, "Invoking channel handler") if newChannel.ChannelType() != "session" { err := errors.New("Don'w allow other channel types than session") dlog.Server.Error(user, err) newChannel.Reject(gossh.Prohibited, err.Error()) return } channel, requests, err := newChannel.Accept() if err != nil { dlog.Server.Error(user, "Could not accept channel", err) return } if err := s.handleRequests(ctx, sshConn, requests, channel, user); err != nil { dlog.Server.Error(user, err) sshConn.Close() } } func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-chan *gossh.Request, channel gossh.Channel, user *user.User) error { dlog.Server.Info(user, "Invoking request handler") for req := range in { var payload = struct{ Value string }{} gossh.Unmarshal(req.Payload, &payload) switch req.Type { case "shell": var handler handlers.Handler switch user.Name { case config.HealthUser: handler = handlers.NewHealthHandler(user) default: handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter) } terminate := func() { handler.Shutdown() sshConn.Close() } go func() { // Broken pipe, cancel io.Copy(channel, handler) terminate() }() go func() { // Broken pipe, cancel io.Copy(handler, channel) terminate() }() go func() { select { case <-ctx.Done(): case <-handler.Done(): } terminate() }() go func() { if err := sshConn.Wait(); err != nil && err != io.EOF { dlog.Server.Error(user, err) } s.stats.decrementConnections() dlog.Server.Info(user, "Good bye Mister!") terminate() }() // Only serving shell type req.Reply(true, nil) default: req.Reply(false, nil) return fmt.Errorf("Closing SSH connection as unknown request received|%s|%v", req.Type, payload.Value) } } return nil } // Callback for SSH authentication. func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Permissions, error) { user, err := user.New(c.User(), c.RemoteAddr().String()) if err != nil { return nil, err } authInfo := string(authPayload) splitted := strings.Split(c.RemoteAddr().String(), ":") remoteIP := splitted[0] switch user.Name { case config.HealthUser: if authInfo == config.HealthUser { dlog.Server.Debug(user, "Granting permissions to health user") return nil, nil } case config.ScheduleUser: for _, job := range config.Server.Schedule { if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) { dlog.Server.Debug(user, "Granting SSH connection") return nil, nil } } case config.ContinuousUser: for _, job := range config.Server.Continuous { if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) { dlog.Server.Debug(user, "Granting SSH connection") return nil, nil } } default: } return nil, fmt.Errorf("user %s not authorized", user) } func (s *Server) backgroundCanSSH(user *user.User, jobName, remoteIP, allowedJobName string, allowFrom []string) bool { dlog.Server.Debug("backgroundCanSSH", user, jobName, remoteIP, allowedJobName, allowFrom) if jobName != allowedJobName { dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Job name does not match, skipping to next one...", allowedJobName) return false } for _, myAddr := range allowFrom { ips, err := net.LookupIP(myAddr) if err != nil { dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Unable to lookup IP "+ "address for allowed hosts lookup, skipping to next one...", myAddr, err) continue } for _, ip := range ips { dlog.Server.Debug(user, jobName, "backgroundCanSSH", "Comparing IP addresses", remoteIP, ip.String()) if remoteIP == ip.String() { return true } } } return false }