diff options
Diffstat (limited to 'clients')
| -rw-r--r-- | clients/args.go | 26 | ||||
| -rw-r--r-- | clients/baseclient.go | 139 | ||||
| -rw-r--r-- | clients/catclient.go | 49 | ||||
| -rw-r--r-- | clients/client.go | 9 | ||||
| -rw-r--r-- | clients/connectionmaker.go | 12 | ||||
| -rw-r--r-- | clients/grepclient.go | 49 | ||||
| -rw-r--r-- | clients/handlers/basehandler.go | 134 | ||||
| -rw-r--r-- | clients/handlers/clienthandler.go | 26 | ||||
| -rw-r--r-- | clients/handlers/handler.go | 12 | ||||
| -rw-r--r-- | clients/handlers/healthhandler.go | 75 | ||||
| -rw-r--r-- | clients/handlers/maprhandler.go | 74 | ||||
| -rw-r--r-- | clients/healthclient.go | 96 | ||||
| -rw-r--r-- | clients/maprclient.go | 153 | ||||
| -rw-r--r-- | clients/remote/connection.go | 230 | ||||
| -rw-r--r-- | clients/stats.go | 81 | ||||
| -rw-r--r-- | clients/tailclient.go | 44 |
16 files changed, 1209 insertions, 0 deletions
diff --git a/clients/args.go b/clients/args.go new file mode 100644 index 0000000..4d5a029 --- /dev/null +++ b/clients/args.go @@ -0,0 +1,26 @@ +package clients + +import ( + "dtail/omode" +) + +// Args is a helper struct to summarize common client arguments. +type Args struct { + // The operating mode (tail, grep, ...) + Mode omode.Mode + // The raw server string + ServersStr string + // SSH user name (e.g. 'pbuetow') + UserName string + // The files to follow. + Files string + // Regex for filtering. + Regex string + // Trust all unknown host keys? + TrustAllHosts bool + // Server discovery method + Discovery string + MaxInitConnections int + // Server ping timeout (0 means pings disabled) + PingTimeout int +} diff --git a/clients/baseclient.go b/clients/baseclient.go new file mode 100644 index 0000000..3a1b8f0 --- /dev/null +++ b/clients/baseclient.go @@ -0,0 +1,139 @@ +package clients + +import ( + "dtail/clients/remote" + "dtail/discovery" + "dtail/logger" + "dtail/omode" + "dtail/ssh/client" + "regexp" + "sync" + "time" + + gossh "golang.org/x/crypto/ssh" +) + +// This is the main client data structure. +type baseClient struct { + Args + // To display client side stats + stats *stats + // List of remote servers to connect to. + servers []string + // We have one connection per remote server. + connections []*remote.Connection + // SSH auth methods to use to connect to the remote servers. + sshAuthMethods []gossh.AuthMethod + // To deal with SSH host keys + hostKeyCallback *client.HostKeyCallback + // To stop the client. + stop chan struct{} + // To indicate that the client has stopped. + stopped chan struct{} + // Throttle how fast we initiate SSH connections concurrently + throttleCh chan struct{} + // Retry connection upon failure? + retry bool + // Connection helper. + maker connectionMaker +} + +func (c *baseClient) init(maker connectionMaker) { + logger.Info("Initiating base client") + + c.maker = maker + //c.connections = make(map[string]*remote.Connection) + c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.TrustAllHosts, c.throttleCh) + + // Retrieve a shuffled list of remote dtail servers. + shuffleServers := true + discoveryService := discovery.New(c.Discovery, c.ServersStr, shuffleServers) + for _, server := range discoveryService.ServerList() { + c.connections = append(c.connections, c.maker.makeConnection(server, c.sshAuthMethods, c.hostKeyCallback)) + } + + if _, err := regexp.Compile(c.Regex); err != nil { + logger.FatalExit(c.Regex, "Can't test compile regex", err) + } + + // Periodically check for unknown hosts, and ask the user whether to trust them or not. + go c.hostKeyCallback.PromptAddHosts(c.stop) + + // Periodically print out connection stats to the client. + c.stats = newTailStats(len(c.connections)) + go c.stats.periodicLogStats(c.throttleCh, c.stop) +} + +func (c *baseClient) Start(wg *sync.WaitGroup) (status int) { + if wg != nil { + defer wg.Done() + } + active := make(chan struct{}, len(c.connections)) + + var wg2 sync.WaitGroup + wg2.Add(len(c.connections)) + + for i, conn := range c.connections { + go func(i int, conn *remote.Connection) { + active <- struct{}{} + defer func() { + logger.Debug(conn.Server, "Disconnected completely...") + <-active + }() + wg2.Done() + + for { + conn.Start(c.throttleCh, c.stats.connectionsEstCh) + if !c.retry { + return + } + time.Sleep(time.Second * 2) + logger.Debug(conn.Server, "Reconencting") + conn = c.maker.makeConnection(conn.Server, c.sshAuthMethods, c.hostKeyCallback) + c.connections[i] = conn + } + }(i, conn) + } + + wg2.Wait() + c.waitUntilDone(active) + + return +} + +func (c *baseClient) waitUntilDone(active chan struct{}) { + defer close(c.stopped) + + if c.Mode != omode.TailClient { + c.waitUntilZero(active) + logger.Info("All connections stopped") + return + } + + <-c.stop + logger.Info("Stopping client") + for _, conn := range c.connections { + conn.Stop() + } + + c.waitUntilZero(active) +} + +func (c *baseClient) waitUntilZero(active chan struct{}) { + for { + logger.Debug("Active connections", len(active)) + if len(active) == 0 { + return + } + time.Sleep(time.Second) + } +} + +func (c *baseClient) Stop() { + close(c.stop) + <-c.WaitC() +} + +func (c *baseClient) WaitC() <-chan struct{} { + return c.stopped +} diff --git a/clients/catclient.go b/clients/catclient.go new file mode 100644 index 0000000..e3b873c --- /dev/null +++ b/clients/catclient.go @@ -0,0 +1,49 @@ +package clients + +import ( + "dtail/clients/handlers" + "dtail/clients/remote" + "dtail/ssh/client" + "errors" + "fmt" + "strings" + + gossh "golang.org/x/crypto/ssh" +) + +// CatClient is a client for returning a whole file from the beginning to the end. +type CatClient struct { + baseClient +} + +// NewCatClient returns a new cat client. +func NewCatClient(args Args) (*CatClient, error) { + if args.Regex != "" { + return nil, errors.New("Can't use regex with 'cat' operating mode") + } + + args.Regex = "." + + c := CatClient{ + baseClient: baseClient{ + Args: args, + stop: make(chan struct{}), + stopped: make(chan struct{}), + throttleCh: make(chan struct{}, args.MaxInitConnections), + retry: false, + }, + } + + c.init(c) + + return &c, nil +} + +func (c CatClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection { + conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) + conn.Handler = handlers.NewClientHandler(server, c.PingTimeout) + for _, file := range strings.Split(c.Files, ",") { + conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex)) + } + return conn +} diff --git a/clients/client.go b/clients/client.go new file mode 100644 index 0000000..e58f51d --- /dev/null +++ b/clients/client.go @@ -0,0 +1,9 @@ +package clients + +import "sync" + +// Client is the interface for the end user command line client. +type Client interface { + Start(wg *sync.WaitGroup) int + Stop() +} diff --git a/clients/connectionmaker.go b/clients/connectionmaker.go new file mode 100644 index 0000000..9e08c2b --- /dev/null +++ b/clients/connectionmaker.go @@ -0,0 +1,12 @@ +package clients + +import ( + "dtail/clients/remote" + "dtail/ssh/client" + + gossh "golang.org/x/crypto/ssh" +) + +type connectionMaker interface { + makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection +} diff --git a/clients/grepclient.go b/clients/grepclient.go new file mode 100644 index 0000000..dbae96c --- /dev/null +++ b/clients/grepclient.go @@ -0,0 +1,49 @@ +package clients + +import ( + "dtail/clients/handlers" + "dtail/clients/remote" + "dtail/ssh/client" + "errors" + "fmt" + "strings" + + gossh "golang.org/x/crypto/ssh" +) + +// GrepClient searches a remote file for all lines matching a regular expression. Only the matching lines are displayed. +type GrepClient struct { + baseClient +} + +// NewGrepClient creates a new grep client. +func NewGrepClient(args Args) (*GrepClient, error) { + if args.Regex == "" { + return nil, errors.New("No regex specified, use '-regex' flag") + } + + c := GrepClient{ + baseClient: baseClient{ + Args: args, + stop: make(chan struct{}), + stopped: make(chan struct{}), + throttleCh: make(chan struct{}, args.MaxInitConnections), + retry: false, + }, + } + + c.init(c) + + return &c, nil +} + +func (c GrepClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection { + conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) + conn.Handler = handlers.NewClientHandler(server, c.PingTimeout) + + for _, file := range strings.Split(c.Files, ",") { + conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex)) + } + + return conn +} diff --git a/clients/handlers/basehandler.go b/clients/handlers/basehandler.go new file mode 100644 index 0000000..ce82aa2 --- /dev/null +++ b/clients/handlers/basehandler.go @@ -0,0 +1,134 @@ +package handlers + +import ( + "dtail/logger" + "errors" + "fmt" + "io" + "strings" + "time" +) + +type baseHandler struct { + server string + shellStarted bool + commands chan string + pong chan struct{} + receiveBuf []byte + stop chan struct{} + pingTimeout int +} + +func (h *baseHandler) Server() string { + return h.server +} + +// Used to determine whether server is still responding to requests or not. +func (h *baseHandler) Ping() error { + if h.pingTimeout == 0 { + // Server ping disabled + return nil + } + + if err := h.SendCommand("ping"); err != nil { + return err + } + + select { + case <-h.pong: + return nil + case <-time.After(time.Duration(h.pingTimeout) * time.Second): + } + + return errors.New("Didn't receive any server pongs (ping replies)") +} + +func (h *baseHandler) SendCommand(command string) error { + if command == "ping" { + logger.Trace("Sending command", h.server, command) + } else { + logger.Debug("Sending command", h.server, command) + } + + select { + case h.commands <- fmt.Sprintf("%s;", command): + case <-time.After(time.Second * 5): + return errors.New("Timed out sending command " + command) + case <-h.stop: + } + + return nil +} + +// Read data from the dtail server via Writer interface. +func (h *baseHandler) Write(p []byte) (n int, err error) { + for _, b := range p { + h.receiveBuf = append(h.receiveBuf, b) + if b == '\n' { + if len(h.receiveBuf) == 0 { + continue + } + message := string(h.receiveBuf) + h.handleMessageType(message) + } + } + + return len(p), nil +} + +// Send data to the dtail server via Reader interface. +func (h *baseHandler) Read(p []byte) (n int, err error) { + select { + case command := <-h.commands: + n = copy(p, []byte(command)) + case <-h.stop: + return 0, io.EOF + } + return +} + +// Handle various message types. +func (h *baseHandler) handleMessageType(message string) { + if len(h.receiveBuf) == 0 { + return + } + // Hidden server commands starti with a dot "." + if h.receiveBuf[0] == '.' { + h.handleHiddenMessage(message) + h.receiveBuf = h.receiveBuf[:0] + return + } + + // Silent mode will only print out remote logs but not remote server + // commands. But remote server commands will be still logged to ./log/. + if logger.Mode == logger.SilentMode { + if h.receiveBuf[0] == 'R' { + logger.Raw(message) + } + h.receiveBuf = h.receiveBuf[:0] + return + } + logger.Raw(message) + h.receiveBuf = h.receiveBuf[:0] +} + +// Handle messages received from server which are not meant to be displayed +// to the end user. +func (h *baseHandler) handleHiddenMessage(message string) { + switch { + case strings.HasPrefix(message, ".pong"): + h.pong <- struct{}{} + case strings.HasPrefix(message, ".syn close connection"): + h.SendCommand("ack close connection") + } +} + +// Stop the handler. +func (h *baseHandler) Stop() { + select { + case <-h.stop: + default: + logger.Debug("Stopping base handler", h.server) + close(h.stop) + } +} diff --git a/clients/handlers/clienthandler.go b/clients/handlers/clienthandler.go new file mode 100644 index 0000000..e818b52 --- /dev/null +++ b/clients/handlers/clienthandler.go @@ -0,0 +1,26 @@ +package handlers + +import ( + "dtail/logger" +) + +// ClientHandler is the basic client handler interface. +type ClientHandler struct { + baseHandler +} + +// NewClientHandler creates a new client handler. +func NewClientHandler(server string, pingTimeout int) *ClientHandler { + logger.Debug(server, "Creating new client handler") + + return &ClientHandler{ + baseHandler{ + server: server, + shellStarted: false, + commands: make(chan string), + pong: make(chan struct{}, 1), + stop: make(chan struct{}), + pingTimeout: pingTimeout, + }, + } +} diff --git a/clients/handlers/handler.go b/clients/handlers/handler.go new file mode 100644 index 0000000..2013be0 --- /dev/null +++ b/clients/handlers/handler.go @@ -0,0 +1,12 @@ +package handlers + +import "io" + +// Handler provides all methods which can be run on any client handler. +type Handler interface { + io.ReadWriter + Ping() error + Stop() + SendCommand(command string) error + Server() string +} diff --git a/clients/handlers/healthhandler.go b/clients/handlers/healthhandler.go new file mode 100644 index 0000000..4051e2c --- /dev/null +++ b/clients/handlers/healthhandler.go @@ -0,0 +1,75 @@ +package handlers + +import ( + "errors" + "fmt" + "time" +) + +// HealthHandler implements the handler required for health checks. +type HealthHandler struct { + // Buffer of incoming data from server. + receiveBuf []byte + // To send commands to the server. + commands chan string + // To receive messages from the server. + receive chan<- string + // The remote server address + server string +} + +// NewHealthHandler returns a new health check handler. +func NewHealthHandler(server string, receive chan<- string) *HealthHandler { + h := HealthHandler{ + server: server, + receive: receive, + commands: make(chan string), + } + + return &h +} + +// Server returns the remote server name. +func (h *HealthHandler) Server() string { + return h.server +} + +// Stop is not of use for health check handler. +func (h *HealthHandler) Stop() { + // Nothing done here. +} + +// Ping is not of use for health check handler. +func (h *HealthHandler) Ping() error { + return nil +} + +// SendCommand send a DTail command to the server. +func (h *HealthHandler) SendCommand(command string) error { + select { + case h.commands <- fmt.Sprintf("%s;", command): + case <-time.NewTimer(time.Second * 10).C: + return errors.New("Timed out sending command " + command) + } + + return nil +} + +// Server writes byte stream to client. +func (h *HealthHandler) Write(p []byte) (n int, err error) { + for _, b := range p { + h.receiveBuf = append(h.receiveBuf, b) + if b == '\n' { + h.receive <- string(h.receiveBuf) + h.receiveBuf = h.receiveBuf[:0] + } + } + + return len(p), nil +} + +// Server reads byte stream from client. +func (h *HealthHandler) Read(p []byte) (n int, err error) { + n = copy(p, []byte(<-h.commands)) + return +} diff --git a/clients/handlers/maprhandler.go b/clients/handlers/maprhandler.go new file mode 100644 index 0000000..830a142 --- /dev/null +++ b/clients/handlers/maprhandler.go @@ -0,0 +1,74 @@ +package handlers + +import ( + "dtail/logger" + "dtail/mapr" + "dtail/mapr/client" + "strings" +) + +// MaprHandler is the handler used on the client side for running mapreduce aggregations. +type MaprHandler struct { + baseHandler + aggregate *client.Aggregate + query *mapr.Query + count uint64 +} + +// NewMaprHandler returns a new mapreduce client handler. +func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet, pingTimeout int) *MaprHandler { + return &MaprHandler{ + baseHandler: baseHandler{ + server: server, + shellStarted: false, + commands: make(chan string), + pong: make(chan struct{}, 1), + stop: make(chan struct{}), + pingTimeout: pingTimeout, + }, + query: query, + aggregate: client.NewAggregate(server, query, globalGroup), + } +} + +// Read data from the dtail server via Writer interface. +func (h *MaprHandler) Write(p []byte) (n int, err error) { + for _, b := range p { + h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) + if b == '\n' { + if len(h.baseHandler.receiveBuf) == 0 { + continue + } + message := string(h.baseHandler.receiveBuf) + + if h.baseHandler.receiveBuf[0] == 'A' { + h.handleAggregateMessage(strings.TrimSpace(message)) + h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0] + continue + } + h.baseHandler.handleMessageType(message) + } + } + + return len(p), nil +} + +// Handle a message received from server including mapr aggregation +// related data. +func (h *MaprHandler) handleAggregateMessage(message string) { + h.count++ + parts := strings.Split(message, "|") + + // Index 0 contains 'AGGREGATE', 1 contains server host. + // Aggregation data begins from index 2. + logger.Debug("Received aggregate data", h.server, h.count) + h.aggregate.Aggregate(parts[2:]) + logger.Debug("Aggregated aggregate data", h.server, h.count) +} + +// Stop stops the mapreduce client handler. +func (h *MaprHandler) Stop() { + logger.Debug("Stopping mapreduce handler", h.server) + h.aggregate.Stop() + h.baseHandler.Stop() +} diff --git a/clients/healthclient.go b/clients/healthclient.go new file mode 100644 index 0000000..1fae99c --- /dev/null +++ b/clients/healthclient.go @@ -0,0 +1,96 @@ +package clients + +import ( + "dtail/clients/handlers" + "dtail/clients/remote" + "dtail/config" + "dtail/omode" + "fmt" + "runtime" + "strings" + "sync" + "time" + + gossh "golang.org/x/crypto/ssh" +) + +// HealthClient is used for health checking (e.g. via Nagios) +type HealthClient struct { + // Client operating mode + mode omode.Mode + // The remote server address + server string + // SSH user name + userName string + // SSH auth methods to use to connect to the remote servers. + sshAuthMethods []gossh.AuthMethod +} + +// NewHealthClient returns a new healh client. +func NewHealthClient(mode omode.Mode) (*HealthClient, error) { + c := HealthClient{ + mode: mode, + server: fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort), + userName: config.ControlUser, + } + c.initSSHAuthMethods() + + return &c, nil +} + +// Start the health client. +func (c *HealthClient) Start(wg *sync.WaitGroup) (status int) { + defer wg.Done() + receive := make(chan string) + + throttleCh := make(chan struct{}, runtime.NumCPU()) + statsCh := make(chan struct{}, 1) + + conn := remote.NewOneOffConnection(c.server, c.userName, c.sshAuthMethods) + conn.Handler = handlers.NewHealthHandler(c.server, receive) + conn.Commands = []string{c.mode.String()} + + go conn.Start(throttleCh, statsCh) + defer conn.Stop() + + for { + select { + case data := <-receive: + // Parse recieved data. + s := strings.Split(data, "|") + message := s[len(s)-1] + if strings.HasPrefix(message, "done;") { + return + } + + // Set severity. + s = strings.Split(message, ":") + switch s[0] { + case "OK": + case "WARNING": + if status < 1 { + status = 1 + } + case "CRITICAL": + status = 2 + case "UNKNOWN": + status = 3 + default: + fmt.Printf("CRITICAL: Unexpected server response: '%s'\n", message) + status = 2 + return + } + fmt.Print(message) + + case <-time.After(time.Second * 2): + status = 2 + fmt.Println("CRITICAL: Could not communicate with DTail server") + return + } + } +} + +// Initialize SSH auth methods. +func (c *HealthClient) initSSHAuthMethods() { + c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser)) +} diff --git a/clients/maprclient.go b/clients/maprclient.go new file mode 100644 index 0000000..ad707c9 --- /dev/null +++ b/clients/maprclient.go @@ -0,0 +1,153 @@ +package clients + +import ( + "dtail/clients/handlers" + "dtail/clients/remote" + "dtail/logger" + "dtail/mapr" + "dtail/omode" + "dtail/ssh/client" + "errors" + "fmt" + "strings" + "sync" + "time" + + gossh "golang.org/x/crypto/ssh" +) + +// MaprClient is used for running mapreduce aggregations on remote files. +type MaprClient struct { + baseClient + // Query string for mapr aggregations + queryStr string + // Global group set for merged mapr aggregation results + globalGroup *mapr.GlobalGroupSet + // The query object (constructed from queryStr) + query *mapr.Query + // Additative result or new result every run? + additative bool +} + +// NewMaprClient returns a new mapreduce client. +func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { + if queryStr == "" { + return nil, errors.New("No mapreduce query specified, use '-query' flag") + } + + c := MaprClient{ + baseClient: baseClient{ + Args: args, + stop: make(chan struct{}), + stopped: make(chan struct{}), + throttleCh: make(chan struct{}, args.MaxInitConnections), + retry: args.Mode == omode.TailClient, + }, + queryStr: queryStr, + additative: args.Mode == omode.MapClient, + } + + query, err := mapr.NewQuery(c.queryStr) + if err != nil { + logger.FatalExit(c.queryStr, "Can't parse mapr query", err) + } + + c.query = query + + switch c.query.Table { + case "*": + c.Regex = fmt.Sprintf("\\|MAPREDUCE:\\|") + case ".": + c.Regex = "." + default: + c.Regex = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table) + } + + c.globalGroup = mapr.NewGlobalGroupSet() + c.baseClient.init(c) + + return &c, nil +} + +func (c MaprClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection { + conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) + conn.Handler = handlers.NewMaprHandler(conn.Server, c.query, c.globalGroup, c.PingTimeout) + + conn.Commands = append(conn.Commands, fmt.Sprintf("map %s", c.query.RawQuery)) + commandStr := "tail" + if c.additative { + commandStr = "cat" + } + + for _, file := range strings.Split(c.Files, ",") { + conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", commandStr, file, c.Regex)) + } + + return conn +} + +// Start starts the mapreduce client. +func (c *MaprClient) Start(wg *sync.WaitGroup) (status int) { + defer wg.Done() + + if c.query.Outfile == "" { + // Only print out periodic results if we don't write an outfile + go c.periodicPrintResults() + } + + status = c.baseClient.Start(nil) + if c.additative { + c.recievedFinalResult() + } + c.baseClient.Stop() + + return +} + +func (c *MaprClient) recievedFinalResult() { + logger.Info("Received final mapreduce result") + + if c.query.Outfile == "" { + c.printResults() + return + } + + logger.Info(fmt.Sprintf("Writing final mapreduce result to '%s'", c.query.Outfile)) + err := c.globalGroup.WriteResult(c.query) + if err != nil { + logger.FatalExit(err) + return + } + logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile)) +} + +func (c *MaprClient) periodicPrintResults() { + for { + select { + case <-time.After(c.query.Interval): + logger.Info("Gathering interim mapreduce result") + c.printResults() + case <-c.baseClient.stop: + return + } + } +} + +func (c *MaprClient) printResults() { + var result string + var err error + var numLines int + + if c.additative { + result, numLines, err = c.globalGroup.Result(c.query) + } else { + result, numLines, err = c.globalGroup.SwapOut().Result(c.query) + } + if err != nil { + logger.FatalExit(err) + } + if numLines > 0 { + logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) + logger.Raw(result) + } +} diff --git a/clients/remote/connection.go b/clients/remote/connection.go new file mode 100644 index 0000000..bd93239 --- /dev/null +++ b/clients/remote/connection.go @@ -0,0 +1,230 @@ +package remote + +import ( + "dtail/clients/handlers" + "dtail/config" + "dtail/logger" + "dtail/ssh/client" + "fmt" + "io" + "strconv" + "strings" + "time" + + "golang.org/x/crypto/ssh" +) + +// Connection represents a client connection connection to a single server. +type Connection struct { + // The remote server's hostname connected to. + Server string + // The remote server's port connected to. + port int + // The SSH client configuration used. + config *ssh.ClientConfig + // The SSH client handler to use. + Handler handlers.Handler + // DTail commands sent from client to server. When client loses + // connection to the server it re-connects automatically and sends the + // same commands again. + Commands []string + // Is it a persistent connection or a one-off? + isOneOff bool + // Used to stop the connection + stop chan struct{} + // To deal with SSH server host keys + hostKeyCallback *client.HostKeyCallback +} + +// NewConnection returns a new connection. +func NewConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *Connection { + logger.Debug(server, "Creating new connection") + + c := Connection{ + hostKeyCallback: hostKeyCallback, + config: &ssh.ClientConfig{ + User: userName, + Auth: authMethods, + HostKeyCallback: hostKeyCallback.Wrap(), + Timeout: time.Second * 3, + }, + stop: make(chan struct{}), + } + + c.initServerPort(server) + + return &c +} + +// NewOneOffConnection creates new one-off connection (only for sending a series of commands and then quit). +func NewOneOffConnection(server string, userName string, authMethods []ssh.AuthMethod) *Connection { + c := Connection{ + config: &ssh.ClientConfig{ + User: userName, + Auth: authMethods, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + }, + stop: make(chan struct{}), + isOneOff: true, + } + + c.initServerPort(server) + + return &c +} + +// Attempt to parse the server port address from the provided server FQDN. +func (c *Connection) initServerPort(server string) { + c.Server = server + c.port = config.Common.SSHPort + parts := strings.Split(server, ":") + + if len(parts) == 2 { + logger.Debug("Parsing port from hostname", parts) + port, err := strconv.Atoi(parts[1]) + if err != nil { + logger.FatalExit("Unable to parse client port", server, parts, err) + } + c.Server = parts[0] + c.port = port + } +} + +// Start the server connection. Build up SSH session and send some DTail commandc. +func (c *Connection) Start(throttleCh, statsCh chan struct{}) { + select { + case <-c.stop: + logger.Info(c.Server, c.port, "Disconnecting client") + return + default: + } + + // Wait for SSH connection throttler + throttleCh <- struct{}{} + + // Wait until connection has been initiated or an error occured + // during initialization. + throttleStopCh := make(chan struct{}, 2) + go func() { + <-throttleStopCh + <-throttleCh + }() + + if err := c.dial(c.Server, c.port, throttleStopCh, statsCh); err != nil { + logger.Warn(c.Server, c.port, err) + throttleStopCh <- struct{}{} + + if c.hostKeyCallback.Untrusted(fmt.Sprintf("%s:%d", c.Server, c.port)) { + logger.Debug("Not trusting host, not trying to re-connect", c.Server, c.port) + return + } + } +} + +// Dail into a new SSH connection. Close connection in case of an error. +func (c *Connection) dial(host string, port int, throttleStopCh, statsCh chan struct{}) error { + statsCh <- struct{}{} + defer func() { <-statsCh }() + + logger.Debug(host, "dial") + address := fmt.Sprintf("%s:%d", host, port) + + client, err := ssh.Dial("tcp", address, c.config) + if err != nil { + return err + } + defer client.Close() + + return c.session(client, throttleStopCh) +} + +// Create the SSH session. Close the session in case of an error. +func (c *Connection) session(client *ssh.Client, throttleStopCh chan<- struct{}) error { + logger.Debug(c.Server, "session") + + session, err := client.NewSession() + if err != nil { + return err + } + defer session.Close() + + return c.handle(session, throttleStopCh) +} + +// Handle the SSH session. Also send periodic pings to the server in order +// to determine that session is still intact. +func (c *Connection) handle(session *ssh.Session, throttleStopCh chan<- struct{}) error { + defer c.Handler.Stop() + + logger.Debug(c.Server, "handle") + + stdinPipe, err := session.StdinPipe() + if err != nil { + return err + } + + stdoutPipe, err := session.StdoutPipe() + if err != nil { + return err + } + + if err := session.Shell(); err != nil { + return err + } + + // Establish Bi-directional pipe between SSH session and client handler. + brokenStdinPipe := make(chan struct{}) + go func() { + defer close(brokenStdinPipe) + io.Copy(stdinPipe, c.Handler) + }() + + brokenStdoutPipe := make(chan struct{}) + go func() { + defer close(brokenStdoutPipe) + io.Copy(c.Handler, stdoutPipe) + }() + + // SSH session established, other goroutine can initiate session now. + throttleStopCh <- struct{}{} + + // Send all commands to client. + for _, command := range c.Commands { + logger.Debug(command) + c.Handler.SendCommand(command) + } + + if !c.isOneOff { + return c.periodicAliveCheck(brokenStdinPipe, brokenStdoutPipe) + } + + <-c.stop + + // Normal shutdown, all fine + return nil +} + +// Periodically check whether connection is still alive or not. +func (c *Connection) periodicAliveCheck(brokenStdinPipe, brokenStdoutPipe <-chan struct{}) error { + for { + select { + case <-time.After(time.Second * 3): + if err := c.Handler.Ping(); err != nil { + return err + } + case <-brokenStdinPipe: + logger.Debug("Broken stdin pipe", c.Server, c.port) + return nil + case <-brokenStdoutPipe: + logger.Debug("Broken stdout pipe", c.Server, c.port) + return nil + case <-c.stop: + return nil + } + } +} + +// Stop the connection. +func (c *Connection) Stop() { + close(c.stop) +} diff --git a/clients/stats.go b/clients/stats.go new file mode 100644 index 0000000..e5b9bed --- /dev/null +++ b/clients/stats.go @@ -0,0 +1,81 @@ +package clients + +import ( + "dtail/logger" + "fmt" + "runtime" + "sync" + "time" +) + +// Used to collect and display various client stats. +type stats struct { + // Total amount servers to connect to. + connectionsTotal int + // To keep track of what connected and disconnected + connectionsEstCh chan struct{} + // Amount of servers connections are established. + connected int + // To synchronize concurrent access. + mutex sync.Mutex +} + +func newTailStats(connectionsTotal int) *stats { + return &stats{ + connectionsTotal: connectionsTotal, + connectionsEstCh: make(chan struct{}, connectionsTotal), + connected: 0, + } +} + +func (s *stats) periodicLogStats(throttleCh chan struct{}, stop <-chan struct{}) { + connectedLast := 0 + statsInterval := 5 + + for { + select { + case <-time.After(time.Second * time.Duration(statsInterval)): + case <-stop: + return + } + + connected := len(s.connectionsEstCh) + throttle := len(throttleCh) + + newConnections := connected - connectedLast + connectionsPerSecond := float64(newConnections) / float64(statsInterval) + s.log(connected, newConnections, connectionsPerSecond, throttle) + + connectedLast = connected + + s.mutex.Lock() + s.connected = connected + s.mutex.Unlock() + } +} + +func (s *stats) numConnected() int { + s.mutex.Lock() + defer s.mutex.Unlock() + + return s.connected +} + +func (s *stats) log(connected, newConnections int, connectionsPerSecond float64, throttle int) { + percConnected := percentOf(float64(s.connectionsTotal), float64(connected)) + + connectedStr := fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected)) + newConnStr := fmt.Sprintf("new=%d", newConnections) + rateStr := fmt.Sprintf("rate=%2.2f/s", connectionsPerSecond) + throttleStr := fmt.Sprintf("throttle=%d", throttle) + cpusGoroutinesStr := fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine()) + + logger.Info("stats", connectedStr, newConnStr, rateStr, throttleStr, cpusGoroutinesStr) +} + +func percentOf(total float64, value float64) float64 { + if total == 0 || total == value { + return 100 + } + return value / (total / 100.0) +} diff --git a/clients/tailclient.go b/clients/tailclient.go new file mode 100644 index 0000000..cb93258 --- /dev/null +++ b/clients/tailclient.go @@ -0,0 +1,44 @@ +package clients + +import ( + "dtail/clients/handlers" + "dtail/clients/remote" + "dtail/ssh/client" + "fmt" + "strings" + + gossh "golang.org/x/crypto/ssh" +) + +// TailClient is used for tailing remote log files (opening, seeking to the end and returning only new incoming lines). +type TailClient struct { + baseClient +} + +// NewTailClient returns a new TailClient. +func NewTailClient(args Args) (*TailClient, error) { + c := TailClient{ + baseClient: baseClient{ + Args: args, + stop: make(chan struct{}), + stopped: make(chan struct{}), + throttleCh: make(chan struct{}, args.MaxInitConnections), + retry: true, + }, + } + + c.init(c) + + return &c, nil +} + +func (c TailClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection { + conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) + conn.Handler = handlers.NewClientHandler(server, c.PingTimeout) + + for _, file := range strings.Split(c.Files, ",") { + conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex)) + } + + return conn +} |
