diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-05 10:00:38 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-05 10:00:38 +0300 |
| commit | f70622f307629a2542ea5eb128dea8c1043d3a40 (patch) | |
| tree | 82455dac0c870b28aea8c96a426050dc215a8818 /internal/clients | |
| parent | 599075bc6580ba77dc22ba1c1ec8aa908ef2462d (diff) | |
more on this
Diffstat (limited to 'internal/clients')
| -rw-r--r-- | internal/clients/baseclient.go | 8 | ||||
| -rw-r--r-- | internal/clients/connectors/serverconnection.go | 19 | ||||
| -rw-r--r-- | internal/clients/connectors/serverless.go | 22 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 114 | ||||
| -rw-r--r-- | internal/clients/healthclient.go | 97 |
5 files changed, 86 insertions, 174 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index fc01955..5ac298f 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -86,7 +86,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i var mutex sync.Mutex for i, conn := range c.connections { go func(i int, conn connectors.Connector) { - connStatus := c.start(ctx, active, i, conn) + connStatus := c.startConnection(ctx, active, i, conn) // Update global status. mutex.Lock() @@ -97,11 +97,12 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i }(i, conn) } + time.Sleep(time.Second * 2) c.waitUntilDone(ctx, active) return } -func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, conn connectors.Connector) (status int) { +func (c *baseClient) startConnection(ctx context.Context, active chan struct{}, i int, conn connectors.Connector) (status int) { // Increment connection count active <- struct{}{} // Derement connection count @@ -146,12 +147,13 @@ func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { <-ctx.Done() } + // TODO: Rewrite this to use a wait group. for { numActive := len(active) if numActive == 0 { return } dlog.Client.Debug("Active connections", numActive) - time.Sleep(time.Second) + time.Sleep(time.Second * time.Millisecond * 100) } } diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index 5bc63ee..1666a79 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -23,7 +23,6 @@ type ServerConnection struct { config *ssh.ClientConfig handler handlers.Handler commands []string - isOneOff bool hostKeyCallback client.HostKeyCallback throttlingDone bool } @@ -49,24 +48,6 @@ func NewServerConnection(server string, userName string, authMethods []ssh.AuthM return &c } -// NewOneOffServerConnection creates new one-off connection (only for sending a series of commands and then quit). -func NewOneOffServerConnection(server string, userName string, authMethods []ssh.AuthMethod, handler handlers.Handler, commands []string) *ServerConnection { - c := ServerConnection{ - server: server, - handler: handler, - commands: commands, - config: &ssh.ClientConfig{ - User: userName, - Auth: authMethods, - HostKeyCallback: ssh.InsecureIgnoreHostKey(), - }, - isOneOff: true, - } - - c.initServerPort() - return &c -} - func (c *ServerConnection) Server() string { return c.server } diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index 7740aab..ae72c9b 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -20,14 +20,12 @@ type Serverless struct { // NewServerConnection returns a new connection. func NewServerless(userName string, handler handlers.Handler, commands []string) *Serverless { - s := Serverless{ + dlog.Client.Debug("Creating new serverless connector", handler, commands) + return &Serverless{ userName: userName, handler: handler, commands: commands, } - - dlog.Client.Debug("Creating new serverless connector", handler, commands) - return &s } func (s *Serverless) Server() string { @@ -58,11 +56,17 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro return err } - serverHandler := serverHandlers.NewServerHandler( - user, - make(chan struct{}, config.Server.MaxConcurrentCats), - make(chan struct{}, config.Server.MaxConcurrentTails), - ) + var serverHandler serverHandlers.Handler + switch s.userName { + case config.ControlUser: + serverHandler = serverHandlers.NewControlHandler(user) + default: + serverHandler = serverHandlers.NewServerHandler( + user, + make(chan struct{}, config.Server.MaxConcurrentCats), + make(chan struct{}, config.Server.MaxConcurrentTails), + ) + } terminate := func() { serverHandler.Shutdown() diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index eca0348..4949985 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -1,90 +1,72 @@ package handlers import ( - "bytes" - "errors" "fmt" - "time" + "strings" "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/protocol" ) -// HealthHandler implements the handler required for health checks. +// HealthHandler is the handler used on the client side for running mapreduce aggregations. type HealthHandler struct { - done *internal.Done - // Buffer of incoming data from server. - receiveBuf bytes.Buffer - // To send commands to the server. - commands chan string - // To receive messages from the server. - receive chan<- string - // The remote server address - server string - // The return status. - status int + baseHandler + HealthStatusCh chan<- int } -// NewHealthHandler returns a new health check handler. -func NewHealthHandler(server string, receive chan<- string) *HealthHandler { - h := HealthHandler{ - server: server, - receive: receive, - commands: make(chan string), - status: -1, - done: internal.NewDone(), +// NewHealthHandler returns a new health client handler. +func NewHealthHandler(server string) *HealthHandler { + dlog.Client.Debug(server, "Creating new health handler") + return &HealthHandler{ + baseHandler: baseHandler{ + server: server, + shellStarted: false, + commands: make(chan string), + status: -1, + done: internal.NewDone(), + }, + HealthStatusCh: make(chan int), } - - return &h -} - -// Server returns the remote server name. -func (h *HealthHandler) Server() string { - return h.server -} - -// Status of the handler. -func (h *HealthHandler) Status() int { - return h.status -} - -// Done returns done channel of the handler. -func (h *HealthHandler) Done() <-chan struct{} { - return h.done.Done() } -// Shutdown the handler. -func (h *HealthHandler) Shutdown() { - h.done.Shutdown() -} - -// SendMessage sends a DTail command to the server. -func (h *HealthHandler) SendMessage(command string) error { - select { - case h.commands <- fmt.Sprintf("%s;", command): - case <-time.NewTimer(time.Second * 10).C: - return errors.New("Timed out sending command " + command) - case <-h.Done(): - } - - return nil -} - -// Server writes byte stream to client. +// Read data from the dtail server via Writer interface. func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.receiveBuf.WriteByte(b) - if b == protocol.MessageDelimiter { - h.receive <- h.receiveBuf.String() - h.receiveBuf.Reset() + switch b { + case '\n': + continue + case protocol.MessageDelimiter: + message := h.baseHandler.receiveBuf.String() + dlog.Client.Debug(message) + h.handleHealthMessage(message) + h.baseHandler.receiveBuf.Reset() + default: + h.baseHandler.receiveBuf.WriteByte(b) } } return len(p), nil } -// Server reads byte stream from client. -func (h *HealthHandler) Read(p []byte) (n int, err error) { - n = copy(p, []byte(<-h.commands)) - return +func (h *HealthHandler) handleHealthMessage(message string) { + s := strings.Split(message, protocol.FieldDelimiter) + message = s[len(s)-1] + status := strings.Split(message, ":") + fmt.Println(status) + /* + switch status { + case "OK": + h.HealthStatusCh <- 0 + case "WARNING": + h.HealthStatusCh <- 1 + case "CRITICAL": + h.HealthStatusCh <- 2 + case "UNKNOWN": + h.HealthStatusCh <- 3 + default: + fmt.Println("CRITICAL: Unexpected server response: '%s'") + h.HealthStatusCh <- 2 + } + */ } diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index 47007b6..df919ae 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -1,101 +1,44 @@ package clients import ( - "context" - "fmt" "runtime" - "strings" - "time" - "github.com/mimecast/dtail/internal/clients/connectors" "github.com/mimecast/dtail/internal/clients/handlers" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/omode" - "github.com/mimecast/dtail/internal/protocol" gossh "golang.org/x/crypto/ssh" ) -// HealthClient is used for health checking (e.g. via Nagios) +// HealthClient is used to perform a basic server health check. type HealthClient struct { - // Client operating mode - mode omode.Mode - // The remote server address - server string - // SSH user name - userName string - // SSH auth methods to use to connect to the remote servers. - sshAuthMethods []gossh.AuthMethod + baseClient } -// NewHealthClient returns a new healh client. -func NewHealthClient(mode omode.Mode) (*HealthClient, error) { +// NewHealthClient returns a new health client. +func NewHealthClient(args config.Args) (*HealthClient, error) { + args.Mode = omode.HealthClient + args.UserName = config.ControlUser c := HealthClient{ - mode: mode, - server: fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort), - userName: config.ControlUser, + baseClient: baseClient{ + Args: args, + throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), + retry: false, + }, } - c.initSSHAuthMethods() + + c.init() + c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser)) + c.makeConnections(c) return &c, nil } -// Start the health client. -func (c *HealthClient) Start(ctx context.Context) (status int) { - receive := make(chan string) - - throttleCh := make(chan struct{}, runtime.NumCPU()) - statsCh := make(chan struct{}, 1) - - conn := connectors.NewOneOffServerConnection( - c.server, - c.userName, - c.sshAuthMethods, - handlers.NewHealthHandler(c.server, receive), - []string{c.mode.String()}, - ) - - connCtx, cancel := context.WithCancel(ctx) - go conn.Start(connCtx, cancel, throttleCh, statsCh) - - for { - select { - case data := <-receive: - // Parse recieved data. - s := strings.Split(data, protocol.FieldDelimiter) - message := s[len(s)-1] - if strings.HasPrefix(message, "done;") { - return - } - - // Set severity. - s = strings.Split(message, ":") - switch s[0] { - case "OK": - case "WARNING": - if status < 1 { - status = 1 - } - case "CRITICAL": - status = 2 - case "UNKNOWN": - status = 3 - default: - fmt.Printf("CRITICAL: Unexpected server response: '%s'\n", message) - status = 2 - return - } - fmt.Print(message) - - case <-time.After(time.Second * 2): - status = 2 - fmt.Println("CRITICAL: Could not communicate with DTail server") - return - } - } +func (c HealthClient) makeHandler(server string) handlers.Handler { + return handlers.NewHealthHandler(server) } -// Initialize SSH auth methods. -func (c *HealthClient) initSSHAuthMethods() { - c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser)) +func (c HealthClient) makeCommands() (commands []string) { + commands = append(commands, "health") + return } |
