summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-05 10:00:38 +0300
committerPaul Buetow <paul@buetow.org>2021-10-05 10:00:38 +0300
commitf70622f307629a2542ea5eb128dea8c1043d3a40 (patch)
tree82455dac0c870b28aea8c96a426050dc215a8818 /internal/clients
parent599075bc6580ba77dc22ba1c1ec8aa908ef2462d (diff)
more on this
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/baseclient.go8
-rw-r--r--internal/clients/connectors/serverconnection.go19
-rw-r--r--internal/clients/connectors/serverless.go22
-rw-r--r--internal/clients/handlers/healthhandler.go114
-rw-r--r--internal/clients/healthclient.go97
5 files changed, 86 insertions, 174 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index fc01955..5ac298f 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -86,7 +86,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
var mutex sync.Mutex
for i, conn := range c.connections {
go func(i int, conn connectors.Connector) {
- connStatus := c.start(ctx, active, i, conn)
+ connStatus := c.startConnection(ctx, active, i, conn)
// Update global status.
mutex.Lock()
@@ -97,11 +97,12 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
}(i, conn)
}
+ time.Sleep(time.Second * 2)
c.waitUntilDone(ctx, active)
return
}
-func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, conn connectors.Connector) (status int) {
+func (c *baseClient) startConnection(ctx context.Context, active chan struct{}, i int, conn connectors.Connector) (status int) {
// Increment connection count
active <- struct{}{}
// Derement connection count
@@ -146,12 +147,13 @@ func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) {
<-ctx.Done()
}
+ // TODO: Rewrite this to use a wait group.
for {
numActive := len(active)
if numActive == 0 {
return
}
dlog.Client.Debug("Active connections", numActive)
- time.Sleep(time.Second)
+ time.Sleep(time.Second * time.Millisecond * 100)
}
}
diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go
index 5bc63ee..1666a79 100644
--- a/internal/clients/connectors/serverconnection.go
+++ b/internal/clients/connectors/serverconnection.go
@@ -23,7 +23,6 @@ type ServerConnection struct {
config *ssh.ClientConfig
handler handlers.Handler
commands []string
- isOneOff bool
hostKeyCallback client.HostKeyCallback
throttlingDone bool
}
@@ -49,24 +48,6 @@ func NewServerConnection(server string, userName string, authMethods []ssh.AuthM
return &c
}
-// NewOneOffServerConnection creates new one-off connection (only for sending a series of commands and then quit).
-func NewOneOffServerConnection(server string, userName string, authMethods []ssh.AuthMethod, handler handlers.Handler, commands []string) *ServerConnection {
- c := ServerConnection{
- server: server,
- handler: handler,
- commands: commands,
- config: &ssh.ClientConfig{
- User: userName,
- Auth: authMethods,
- HostKeyCallback: ssh.InsecureIgnoreHostKey(),
- },
- isOneOff: true,
- }
-
- c.initServerPort()
- return &c
-}
-
func (c *ServerConnection) Server() string {
return c.server
}
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
index 7740aab..ae72c9b 100644
--- a/internal/clients/connectors/serverless.go
+++ b/internal/clients/connectors/serverless.go
@@ -20,14 +20,12 @@ type Serverless struct {
// NewServerConnection returns a new connection.
func NewServerless(userName string, handler handlers.Handler, commands []string) *Serverless {
- s := Serverless{
+ dlog.Client.Debug("Creating new serverless connector", handler, commands)
+ return &Serverless{
userName: userName,
handler: handler,
commands: commands,
}
-
- dlog.Client.Debug("Creating new serverless connector", handler, commands)
- return &s
}
func (s *Serverless) Server() string {
@@ -58,11 +56,17 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
return err
}
- serverHandler := serverHandlers.NewServerHandler(
- user,
- make(chan struct{}, config.Server.MaxConcurrentCats),
- make(chan struct{}, config.Server.MaxConcurrentTails),
- )
+ var serverHandler serverHandlers.Handler
+ switch s.userName {
+ case config.ControlUser:
+ serverHandler = serverHandlers.NewControlHandler(user)
+ default:
+ serverHandler = serverHandlers.NewServerHandler(
+ user,
+ make(chan struct{}, config.Server.MaxConcurrentCats),
+ make(chan struct{}, config.Server.MaxConcurrentTails),
+ )
+ }
terminate := func() {
serverHandler.Shutdown()
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index eca0348..4949985 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -1,90 +1,72 @@
package handlers
import (
- "bytes"
- "errors"
"fmt"
- "time"
+ "strings"
"github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/protocol"
)
-// HealthHandler implements the handler required for health checks.
+// HealthHandler is the handler used on the client side for running mapreduce aggregations.
type HealthHandler struct {
- done *internal.Done
- // Buffer of incoming data from server.
- receiveBuf bytes.Buffer
- // To send commands to the server.
- commands chan string
- // To receive messages from the server.
- receive chan<- string
- // The remote server address
- server string
- // The return status.
- status int
+ baseHandler
+ HealthStatusCh chan<- int
}
-// NewHealthHandler returns a new health check handler.
-func NewHealthHandler(server string, receive chan<- string) *HealthHandler {
- h := HealthHandler{
- server: server,
- receive: receive,
- commands: make(chan string),
- status: -1,
- done: internal.NewDone(),
+// NewHealthHandler returns a new health client handler.
+func NewHealthHandler(server string) *HealthHandler {
+ dlog.Client.Debug(server, "Creating new health handler")
+ return &HealthHandler{
+ baseHandler: baseHandler{
+ server: server,
+ shellStarted: false,
+ commands: make(chan string),
+ status: -1,
+ done: internal.NewDone(),
+ },
+ HealthStatusCh: make(chan int),
}
-
- return &h
-}
-
-// Server returns the remote server name.
-func (h *HealthHandler) Server() string {
- return h.server
-}
-
-// Status of the handler.
-func (h *HealthHandler) Status() int {
- return h.status
-}
-
-// Done returns done channel of the handler.
-func (h *HealthHandler) Done() <-chan struct{} {
- return h.done.Done()
}
-// Shutdown the handler.
-func (h *HealthHandler) Shutdown() {
- h.done.Shutdown()
-}
-
-// SendMessage sends a DTail command to the server.
-func (h *HealthHandler) SendMessage(command string) error {
- select {
- case h.commands <- fmt.Sprintf("%s;", command):
- case <-time.NewTimer(time.Second * 10).C:
- return errors.New("Timed out sending command " + command)
- case <-h.Done():
- }
-
- return nil
-}
-
-// Server writes byte stream to client.
+// Read data from the dtail server via Writer interface.
func (h *HealthHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- h.receiveBuf.WriteByte(b)
- if b == protocol.MessageDelimiter {
- h.receive <- h.receiveBuf.String()
- h.receiveBuf.Reset()
+ switch b {
+ case '\n':
+ continue
+ case protocol.MessageDelimiter:
+ message := h.baseHandler.receiveBuf.String()
+ dlog.Client.Debug(message)
+ h.handleHealthMessage(message)
+ h.baseHandler.receiveBuf.Reset()
+ default:
+ h.baseHandler.receiveBuf.WriteByte(b)
}
}
return len(p), nil
}
-// Server reads byte stream from client.
-func (h *HealthHandler) Read(p []byte) (n int, err error) {
- n = copy(p, []byte(<-h.commands))
- return
+func (h *HealthHandler) handleHealthMessage(message string) {
+ s := strings.Split(message, protocol.FieldDelimiter)
+ message = s[len(s)-1]
+ status := strings.Split(message, ":")
+ fmt.Println(status)
+ /*
+ switch status {
+ case "OK":
+ h.HealthStatusCh <- 0
+ case "WARNING":
+ h.HealthStatusCh <- 1
+ case "CRITICAL":
+ h.HealthStatusCh <- 2
+ case "UNKNOWN":
+ h.HealthStatusCh <- 3
+ default:
+ fmt.Println("CRITICAL: Unexpected server response: '%s'")
+ h.HealthStatusCh <- 2
+ }
+ */
}
diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go
index 47007b6..df919ae 100644
--- a/internal/clients/healthclient.go
+++ b/internal/clients/healthclient.go
@@ -1,101 +1,44 @@
package clients
import (
- "context"
- "fmt"
"runtime"
- "strings"
- "time"
- "github.com/mimecast/dtail/internal/clients/connectors"
"github.com/mimecast/dtail/internal/clients/handlers"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/omode"
- "github.com/mimecast/dtail/internal/protocol"
gossh "golang.org/x/crypto/ssh"
)
-// HealthClient is used for health checking (e.g. via Nagios)
+// HealthClient is used to perform a basic server health check.
type HealthClient struct {
- // Client operating mode
- mode omode.Mode
- // The remote server address
- server string
- // SSH user name
- userName string
- // SSH auth methods to use to connect to the remote servers.
- sshAuthMethods []gossh.AuthMethod
+ baseClient
}
-// NewHealthClient returns a new healh client.
-func NewHealthClient(mode omode.Mode) (*HealthClient, error) {
+// NewHealthClient returns a new health client.
+func NewHealthClient(args config.Args) (*HealthClient, error) {
+ args.Mode = omode.HealthClient
+ args.UserName = config.ControlUser
c := HealthClient{
- mode: mode,
- server: fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort),
- userName: config.ControlUser,
+ baseClient: baseClient{
+ Args: args,
+ throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()),
+ retry: false,
+ },
}
- c.initSSHAuthMethods()
+
+ c.init()
+ c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser))
+ c.makeConnections(c)
return &c, nil
}
-// Start the health client.
-func (c *HealthClient) Start(ctx context.Context) (status int) {
- receive := make(chan string)
-
- throttleCh := make(chan struct{}, runtime.NumCPU())
- statsCh := make(chan struct{}, 1)
-
- conn := connectors.NewOneOffServerConnection(
- c.server,
- c.userName,
- c.sshAuthMethods,
- handlers.NewHealthHandler(c.server, receive),
- []string{c.mode.String()},
- )
-
- connCtx, cancel := context.WithCancel(ctx)
- go conn.Start(connCtx, cancel, throttleCh, statsCh)
-
- for {
- select {
- case data := <-receive:
- // Parse recieved data.
- s := strings.Split(data, protocol.FieldDelimiter)
- message := s[len(s)-1]
- if strings.HasPrefix(message, "done;") {
- return
- }
-
- // Set severity.
- s = strings.Split(message, ":")
- switch s[0] {
- case "OK":
- case "WARNING":
- if status < 1 {
- status = 1
- }
- case "CRITICAL":
- status = 2
- case "UNKNOWN":
- status = 3
- default:
- fmt.Printf("CRITICAL: Unexpected server response: '%s'\n", message)
- status = 2
- return
- }
- fmt.Print(message)
-
- case <-time.After(time.Second * 2):
- status = 2
- fmt.Println("CRITICAL: Could not communicate with DTail server")
- return
- }
- }
+func (c HealthClient) makeHandler(server string) handlers.Handler {
+ return handlers.NewHealthHandler(server)
}
-// Initialize SSH auth methods.
-func (c *HealthClient) initSSHAuthMethods() {
- c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser))
+func (c HealthClient) makeCommands() (commands []string) {
+ commands = append(commands, "health")
+ return
}