summaryrefslogtreecommitdiff
path: root/clients
diff options
context:
space:
mode:
Diffstat (limited to 'clients')
-rw-r--r--clients/args.go26
-rw-r--r--clients/baseclient.go139
-rw-r--r--clients/catclient.go49
-rw-r--r--clients/client.go9
-rw-r--r--clients/connectionmaker.go12
-rw-r--r--clients/grepclient.go49
-rw-r--r--clients/handlers/basehandler.go134
-rw-r--r--clients/handlers/clienthandler.go26
-rw-r--r--clients/handlers/handler.go12
-rw-r--r--clients/handlers/healthhandler.go75
-rw-r--r--clients/handlers/maprhandler.go74
-rw-r--r--clients/healthclient.go96
-rw-r--r--clients/maprclient.go153
-rw-r--r--clients/remote/connection.go230
-rw-r--r--clients/stats.go81
-rw-r--r--clients/tailclient.go44
16 files changed, 1209 insertions, 0 deletions
diff --git a/clients/args.go b/clients/args.go
new file mode 100644
index 0000000..4d5a029
--- /dev/null
+++ b/clients/args.go
@@ -0,0 +1,26 @@
+package clients
+
+import (
+ "dtail/omode"
+)
+
+// Args is a helper struct to summarize common client arguments.
+type Args struct {
+ // The operating mode (tail, grep, ...)
+ Mode omode.Mode
+ // The raw server string
+ ServersStr string
+ // SSH user name (e.g. 'pbuetow')
+ UserName string
+ // The files to follow.
+ Files string
+ // Regex for filtering.
+ Regex string
+ // Trust all unknown host keys?
+ TrustAllHosts bool
+ // Server discovery method
+ Discovery string
+ MaxInitConnections int
+ // Server ping timeout (0 means pings disabled)
+ PingTimeout int
+}
diff --git a/clients/baseclient.go b/clients/baseclient.go
new file mode 100644
index 0000000..3a1b8f0
--- /dev/null
+++ b/clients/baseclient.go
@@ -0,0 +1,139 @@
+package clients
+
+import (
+ "dtail/clients/remote"
+ "dtail/discovery"
+ "dtail/logger"
+ "dtail/omode"
+ "dtail/ssh/client"
+ "regexp"
+ "sync"
+ "time"
+
+ gossh "golang.org/x/crypto/ssh"
+)
+
+// This is the main client data structure.
+type baseClient struct {
+ Args
+ // To display client side stats
+ stats *stats
+ // List of remote servers to connect to.
+ servers []string
+ // We have one connection per remote server.
+ connections []*remote.Connection
+ // SSH auth methods to use to connect to the remote servers.
+ sshAuthMethods []gossh.AuthMethod
+ // To deal with SSH host keys
+ hostKeyCallback *client.HostKeyCallback
+ // To stop the client.
+ stop chan struct{}
+ // To indicate that the client has stopped.
+ stopped chan struct{}
+ // Throttle how fast we initiate SSH connections concurrently
+ throttleCh chan struct{}
+ // Retry connection upon failure?
+ retry bool
+ // Connection helper.
+ maker connectionMaker
+}
+
+func (c *baseClient) init(maker connectionMaker) {
+ logger.Info("Initiating base client")
+
+ c.maker = maker
+ //c.connections = make(map[string]*remote.Connection)
+ c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.TrustAllHosts, c.throttleCh)
+
+ // Retrieve a shuffled list of remote dtail servers.
+ shuffleServers := true
+ discoveryService := discovery.New(c.Discovery, c.ServersStr, shuffleServers)
+ for _, server := range discoveryService.ServerList() {
+ c.connections = append(c.connections, c.maker.makeConnection(server, c.sshAuthMethods, c.hostKeyCallback))
+ }
+
+ if _, err := regexp.Compile(c.Regex); err != nil {
+ logger.FatalExit(c.Regex, "Can't test compile regex", err)
+ }
+
+ // Periodically check for unknown hosts, and ask the user whether to trust them or not.
+ go c.hostKeyCallback.PromptAddHosts(c.stop)
+
+ // Periodically print out connection stats to the client.
+ c.stats = newTailStats(len(c.connections))
+ go c.stats.periodicLogStats(c.throttleCh, c.stop)
+}
+
+func (c *baseClient) Start(wg *sync.WaitGroup) (status int) {
+ if wg != nil {
+ defer wg.Done()
+ }
+ active := make(chan struct{}, len(c.connections))
+
+ var wg2 sync.WaitGroup
+ wg2.Add(len(c.connections))
+
+ for i, conn := range c.connections {
+ go func(i int, conn *remote.Connection) {
+ active <- struct{}{}
+ defer func() {
+ logger.Debug(conn.Server, "Disconnected completely...")
+ <-active
+ }()
+ wg2.Done()
+
+ for {
+ conn.Start(c.throttleCh, c.stats.connectionsEstCh)
+ if !c.retry {
+ return
+ }
+ time.Sleep(time.Second * 2)
+ logger.Debug(conn.Server, "Reconencting")
+ conn = c.maker.makeConnection(conn.Server, c.sshAuthMethods, c.hostKeyCallback)
+ c.connections[i] = conn
+ }
+ }(i, conn)
+ }
+
+ wg2.Wait()
+ c.waitUntilDone(active)
+
+ return
+}
+
+func (c *baseClient) waitUntilDone(active chan struct{}) {
+ defer close(c.stopped)
+
+ if c.Mode != omode.TailClient {
+ c.waitUntilZero(active)
+ logger.Info("All connections stopped")
+ return
+ }
+
+ <-c.stop
+ logger.Info("Stopping client")
+ for _, conn := range c.connections {
+ conn.Stop()
+ }
+
+ c.waitUntilZero(active)
+}
+
+func (c *baseClient) waitUntilZero(active chan struct{}) {
+ for {
+ logger.Debug("Active connections", len(active))
+ if len(active) == 0 {
+ return
+ }
+ time.Sleep(time.Second)
+ }
+}
+
+func (c *baseClient) Stop() {
+ close(c.stop)
+ <-c.WaitC()
+}
+
+func (c *baseClient) WaitC() <-chan struct{} {
+ return c.stopped
+}
diff --git a/clients/catclient.go b/clients/catclient.go
new file mode 100644
index 0000000..e3b873c
--- /dev/null
+++ b/clients/catclient.go
@@ -0,0 +1,49 @@
+package clients
+
+import (
+ "dtail/clients/handlers"
+ "dtail/clients/remote"
+ "dtail/ssh/client"
+ "errors"
+ "fmt"
+ "strings"
+
+ gossh "golang.org/x/crypto/ssh"
+)
+
+// CatClient is a client for returning a whole file from the beginning to the end.
+type CatClient struct {
+ baseClient
+}
+
+// NewCatClient returns a new cat client.
+func NewCatClient(args Args) (*CatClient, error) {
+ if args.Regex != "" {
+ return nil, errors.New("Can't use regex with 'cat' operating mode")
+ }
+
+ args.Regex = "."
+
+ c := CatClient{
+ baseClient: baseClient{
+ Args: args,
+ stop: make(chan struct{}),
+ stopped: make(chan struct{}),
+ throttleCh: make(chan struct{}, args.MaxInitConnections),
+ retry: false,
+ },
+ }
+
+ c.init(c)
+
+ return &c, nil
+}
+
+func (c CatClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection {
+ conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
+ conn.Handler = handlers.NewClientHandler(server, c.PingTimeout)
+ for _, file := range strings.Split(c.Files, ",") {
+ conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex))
+ }
+ return conn
+}
diff --git a/clients/client.go b/clients/client.go
new file mode 100644
index 0000000..e58f51d
--- /dev/null
+++ b/clients/client.go
@@ -0,0 +1,9 @@
+package clients
+
+import "sync"
+
+// Client is the interface for the end user command line client.
+type Client interface {
+ Start(wg *sync.WaitGroup) int
+ Stop()
+}
diff --git a/clients/connectionmaker.go b/clients/connectionmaker.go
new file mode 100644
index 0000000..9e08c2b
--- /dev/null
+++ b/clients/connectionmaker.go
@@ -0,0 +1,12 @@
+package clients
+
+import (
+ "dtail/clients/remote"
+ "dtail/ssh/client"
+
+ gossh "golang.org/x/crypto/ssh"
+)
+
+type connectionMaker interface {
+ makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection
+}
diff --git a/clients/grepclient.go b/clients/grepclient.go
new file mode 100644
index 0000000..dbae96c
--- /dev/null
+++ b/clients/grepclient.go
@@ -0,0 +1,49 @@
+package clients
+
+import (
+ "dtail/clients/handlers"
+ "dtail/clients/remote"
+ "dtail/ssh/client"
+ "errors"
+ "fmt"
+ "strings"
+
+ gossh "golang.org/x/crypto/ssh"
+)
+
+// GrepClient searches a remote file for all lines matching a regular expression. Only the matching lines are displayed.
+type GrepClient struct {
+ baseClient
+}
+
+// NewGrepClient creates a new grep client.
+func NewGrepClient(args Args) (*GrepClient, error) {
+ if args.Regex == "" {
+ return nil, errors.New("No regex specified, use '-regex' flag")
+ }
+
+ c := GrepClient{
+ baseClient: baseClient{
+ Args: args,
+ stop: make(chan struct{}),
+ stopped: make(chan struct{}),
+ throttleCh: make(chan struct{}, args.MaxInitConnections),
+ retry: false,
+ },
+ }
+
+ c.init(c)
+
+ return &c, nil
+}
+
+func (c GrepClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection {
+ conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
+ conn.Handler = handlers.NewClientHandler(server, c.PingTimeout)
+
+ for _, file := range strings.Split(c.Files, ",") {
+ conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex))
+ }
+
+ return conn
+}
diff --git a/clients/handlers/basehandler.go b/clients/handlers/basehandler.go
new file mode 100644
index 0000000..ce82aa2
--- /dev/null
+++ b/clients/handlers/basehandler.go
@@ -0,0 +1,134 @@
+package handlers
+
+import (
+ "dtail/logger"
+ "errors"
+ "fmt"
+ "io"
+ "strings"
+ "time"
+)
+
+type baseHandler struct {
+ server string
+ shellStarted bool
+ commands chan string
+ pong chan struct{}
+ receiveBuf []byte
+ stop chan struct{}
+ pingTimeout int
+}
+
+func (h *baseHandler) Server() string {
+ return h.server
+}
+
+// Used to determine whether server is still responding to requests or not.
+func (h *baseHandler) Ping() error {
+ if h.pingTimeout == 0 {
+ // Server ping disabled
+ return nil
+ }
+
+ if err := h.SendCommand("ping"); err != nil {
+ return err
+ }
+
+ select {
+ case <-h.pong:
+ return nil
+ case <-time.After(time.Duration(h.pingTimeout) * time.Second):
+ }
+
+ return errors.New("Didn't receive any server pongs (ping replies)")
+}
+
+func (h *baseHandler) SendCommand(command string) error {
+ if command == "ping" {
+ logger.Trace("Sending command", h.server, command)
+ } else {
+ logger.Debug("Sending command", h.server, command)
+ }
+
+ select {
+ case h.commands <- fmt.Sprintf("%s;", command):
+ case <-time.After(time.Second * 5):
+ return errors.New("Timed out sending command " + command)
+ case <-h.stop:
+ }
+
+ return nil
+}
+
+// Read data from the dtail server via Writer interface.
+func (h *baseHandler) Write(p []byte) (n int, err error) {
+ for _, b := range p {
+ h.receiveBuf = append(h.receiveBuf, b)
+ if b == '\n' {
+ if len(h.receiveBuf) == 0 {
+ continue
+ }
+ message := string(h.receiveBuf)
+ h.handleMessageType(message)
+ }
+ }
+
+ return len(p), nil
+}
+
+// Send data to the dtail server via Reader interface.
+func (h *baseHandler) Read(p []byte) (n int, err error) {
+ select {
+ case command := <-h.commands:
+ n = copy(p, []byte(command))
+ case <-h.stop:
+ return 0, io.EOF
+ }
+ return
+}
+
+// Handle various message types.
+func (h *baseHandler) handleMessageType(message string) {
+ if len(h.receiveBuf) == 0 {
+ return
+ }
+ // Hidden server commands starti with a dot "."
+ if h.receiveBuf[0] == '.' {
+ h.handleHiddenMessage(message)
+ h.receiveBuf = h.receiveBuf[:0]
+ return
+ }
+
+ // Silent mode will only print out remote logs but not remote server
+ // commands. But remote server commands will be still logged to ./log/.
+ if logger.Mode == logger.SilentMode {
+ if h.receiveBuf[0] == 'R' {
+ logger.Raw(message)
+ }
+ h.receiveBuf = h.receiveBuf[:0]
+ return
+ }
+ logger.Raw(message)
+ h.receiveBuf = h.receiveBuf[:0]
+}
+
+// Handle messages received from server which are not meant to be displayed
+// to the end user.
+func (h *baseHandler) handleHiddenMessage(message string) {
+ switch {
+ case strings.HasPrefix(message, ".pong"):
+ h.pong <- struct{}{}
+ case strings.HasPrefix(message, ".syn close connection"):
+ h.SendCommand("ack close connection")
+ }
+}
+
+// Stop the handler.
+func (h *baseHandler) Stop() {
+ select {
+ case <-h.stop:
+ default:
+ logger.Debug("Stopping base handler", h.server)
+ close(h.stop)
+ }
+}
diff --git a/clients/handlers/clienthandler.go b/clients/handlers/clienthandler.go
new file mode 100644
index 0000000..e818b52
--- /dev/null
+++ b/clients/handlers/clienthandler.go
@@ -0,0 +1,26 @@
+package handlers
+
+import (
+ "dtail/logger"
+)
+
+// ClientHandler is the basic client handler interface.
+type ClientHandler struct {
+ baseHandler
+}
+
+// NewClientHandler creates a new client handler.
+func NewClientHandler(server string, pingTimeout int) *ClientHandler {
+ logger.Debug(server, "Creating new client handler")
+
+ return &ClientHandler{
+ baseHandler{
+ server: server,
+ shellStarted: false,
+ commands: make(chan string),
+ pong: make(chan struct{}, 1),
+ stop: make(chan struct{}),
+ pingTimeout: pingTimeout,
+ },
+ }
+}
diff --git a/clients/handlers/handler.go b/clients/handlers/handler.go
new file mode 100644
index 0000000..2013be0
--- /dev/null
+++ b/clients/handlers/handler.go
@@ -0,0 +1,12 @@
+package handlers
+
+import "io"
+
+// Handler provides all methods which can be run on any client handler.
+type Handler interface {
+ io.ReadWriter
+ Ping() error
+ Stop()
+ SendCommand(command string) error
+ Server() string
+}
diff --git a/clients/handlers/healthhandler.go b/clients/handlers/healthhandler.go
new file mode 100644
index 0000000..4051e2c
--- /dev/null
+++ b/clients/handlers/healthhandler.go
@@ -0,0 +1,75 @@
+package handlers
+
+import (
+ "errors"
+ "fmt"
+ "time"
+)
+
+// HealthHandler implements the handler required for health checks.
+type HealthHandler struct {
+ // Buffer of incoming data from server.
+ receiveBuf []byte
+ // To send commands to the server.
+ commands chan string
+ // To receive messages from the server.
+ receive chan<- string
+ // The remote server address
+ server string
+}
+
+// NewHealthHandler returns a new health check handler.
+func NewHealthHandler(server string, receive chan<- string) *HealthHandler {
+ h := HealthHandler{
+ server: server,
+ receive: receive,
+ commands: make(chan string),
+ }
+
+ return &h
+}
+
+// Server returns the remote server name.
+func (h *HealthHandler) Server() string {
+ return h.server
+}
+
+// Stop is not of use for health check handler.
+func (h *HealthHandler) Stop() {
+ // Nothing done here.
+}
+
+// Ping is not of use for health check handler.
+func (h *HealthHandler) Ping() error {
+ return nil
+}
+
+// SendCommand send a DTail command to the server.
+func (h *HealthHandler) SendCommand(command string) error {
+ select {
+ case h.commands <- fmt.Sprintf("%s;", command):
+ case <-time.NewTimer(time.Second * 10).C:
+ return errors.New("Timed out sending command " + command)
+ }
+
+ return nil
+}
+
+// Server writes byte stream to client.
+func (h *HealthHandler) Write(p []byte) (n int, err error) {
+ for _, b := range p {
+ h.receiveBuf = append(h.receiveBuf, b)
+ if b == '\n' {
+ h.receive <- string(h.receiveBuf)
+ h.receiveBuf = h.receiveBuf[:0]
+ }
+ }
+
+ return len(p), nil
+}
+
+// Server reads byte stream from client.
+func (h *HealthHandler) Read(p []byte) (n int, err error) {
+ n = copy(p, []byte(<-h.commands))
+ return
+}
diff --git a/clients/handlers/maprhandler.go b/clients/handlers/maprhandler.go
new file mode 100644
index 0000000..830a142
--- /dev/null
+++ b/clients/handlers/maprhandler.go
@@ -0,0 +1,74 @@
+package handlers
+
+import (
+ "dtail/logger"
+ "dtail/mapr"
+ "dtail/mapr/client"
+ "strings"
+)
+
+// MaprHandler is the handler used on the client side for running mapreduce aggregations.
+type MaprHandler struct {
+ baseHandler
+ aggregate *client.Aggregate
+ query *mapr.Query
+ count uint64
+}
+
+// NewMaprHandler returns a new mapreduce client handler.
+func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet, pingTimeout int) *MaprHandler {
+ return &MaprHandler{
+ baseHandler: baseHandler{
+ server: server,
+ shellStarted: false,
+ commands: make(chan string),
+ pong: make(chan struct{}, 1),
+ stop: make(chan struct{}),
+ pingTimeout: pingTimeout,
+ },
+ query: query,
+ aggregate: client.NewAggregate(server, query, globalGroup),
+ }
+}
+
+// Read data from the dtail server via Writer interface.
+func (h *MaprHandler) Write(p []byte) (n int, err error) {
+ for _, b := range p {
+ h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b)
+ if b == '\n' {
+ if len(h.baseHandler.receiveBuf) == 0 {
+ continue
+ }
+ message := string(h.baseHandler.receiveBuf)
+
+ if h.baseHandler.receiveBuf[0] == 'A' {
+ h.handleAggregateMessage(strings.TrimSpace(message))
+ h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0]
+ continue
+ }
+ h.baseHandler.handleMessageType(message)
+ }
+ }
+
+ return len(p), nil
+}
+
+// Handle a message received from server including mapr aggregation
+// related data.
+func (h *MaprHandler) handleAggregateMessage(message string) {
+ h.count++
+ parts := strings.Split(message, "|")
+
+ // Index 0 contains 'AGGREGATE', 1 contains server host.
+ // Aggregation data begins from index 2.
+ logger.Debug("Received aggregate data", h.server, h.count)
+ h.aggregate.Aggregate(parts[2:])
+ logger.Debug("Aggregated aggregate data", h.server, h.count)
+}
+
+// Stop stops the mapreduce client handler.
+func (h *MaprHandler) Stop() {
+ logger.Debug("Stopping mapreduce handler", h.server)
+ h.aggregate.Stop()
+ h.baseHandler.Stop()
+}
diff --git a/clients/healthclient.go b/clients/healthclient.go
new file mode 100644
index 0000000..1fae99c
--- /dev/null
+++ b/clients/healthclient.go
@@ -0,0 +1,96 @@
+package clients
+
+import (
+ "dtail/clients/handlers"
+ "dtail/clients/remote"
+ "dtail/config"
+ "dtail/omode"
+ "fmt"
+ "runtime"
+ "strings"
+ "sync"
+ "time"
+
+ gossh "golang.org/x/crypto/ssh"
+)
+
+// HealthClient is used for health checking (e.g. via Nagios)
+type HealthClient struct {
+ // Client operating mode
+ mode omode.Mode
+ // The remote server address
+ server string
+ // SSH user name
+ userName string
+ // SSH auth methods to use to connect to the remote servers.
+ sshAuthMethods []gossh.AuthMethod
+}
+
+// NewHealthClient returns a new healh client.
+func NewHealthClient(mode omode.Mode) (*HealthClient, error) {
+ c := HealthClient{
+ mode: mode,
+ server: fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort),
+ userName: config.ControlUser,
+ }
+ c.initSSHAuthMethods()
+
+ return &c, nil
+}
+
+// Start the health client.
+func (c *HealthClient) Start(wg *sync.WaitGroup) (status int) {
+ defer wg.Done()
+ receive := make(chan string)
+
+ throttleCh := make(chan struct{}, runtime.NumCPU())
+ statsCh := make(chan struct{}, 1)
+
+ conn := remote.NewOneOffConnection(c.server, c.userName, c.sshAuthMethods)
+ conn.Handler = handlers.NewHealthHandler(c.server, receive)
+ conn.Commands = []string{c.mode.String()}
+
+ go conn.Start(throttleCh, statsCh)
+ defer conn.Stop()
+
+ for {
+ select {
+ case data := <-receive:
+ // Parse recieved data.
+ s := strings.Split(data, "|")
+ message := s[len(s)-1]
+ if strings.HasPrefix(message, "done;") {
+ return
+ }
+
+ // Set severity.
+ s = strings.Split(message, ":")
+ switch s[0] {
+ case "OK":
+ case "WARNING":
+ if status < 1 {
+ status = 1
+ }
+ case "CRITICAL":
+ status = 2
+ case "UNKNOWN":
+ status = 3
+ default:
+ fmt.Printf("CRITICAL: Unexpected server response: '%s'\n", message)
+ status = 2
+ return
+ }
+ fmt.Print(message)
+
+ case <-time.After(time.Second * 2):
+ status = 2
+ fmt.Println("CRITICAL: Could not communicate with DTail server")
+ return
+ }
+ }
+}
+
+// Initialize SSH auth methods.
+func (c *HealthClient) initSSHAuthMethods() {
+ c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser))
+}
diff --git a/clients/maprclient.go b/clients/maprclient.go
new file mode 100644
index 0000000..ad707c9
--- /dev/null
+++ b/clients/maprclient.go
@@ -0,0 +1,153 @@
+package clients
+
+import (
+ "dtail/clients/handlers"
+ "dtail/clients/remote"
+ "dtail/logger"
+ "dtail/mapr"
+ "dtail/omode"
+ "dtail/ssh/client"
+ "errors"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ gossh "golang.org/x/crypto/ssh"
+)
+
+// MaprClient is used for running mapreduce aggregations on remote files.
+type MaprClient struct {
+ baseClient
+ // Query string for mapr aggregations
+ queryStr string
+ // Global group set for merged mapr aggregation results
+ globalGroup *mapr.GlobalGroupSet
+ // The query object (constructed from queryStr)
+ query *mapr.Query
+ // Additative result or new result every run?
+ additative bool
+}
+
+// NewMaprClient returns a new mapreduce client.
+func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
+ if queryStr == "" {
+ return nil, errors.New("No mapreduce query specified, use '-query' flag")
+ }
+
+ c := MaprClient{
+ baseClient: baseClient{
+ Args: args,
+ stop: make(chan struct{}),
+ stopped: make(chan struct{}),
+ throttleCh: make(chan struct{}, args.MaxInitConnections),
+ retry: args.Mode == omode.TailClient,
+ },
+ queryStr: queryStr,
+ additative: args.Mode == omode.MapClient,
+ }
+
+ query, err := mapr.NewQuery(c.queryStr)
+ if err != nil {
+ logger.FatalExit(c.queryStr, "Can't parse mapr query", err)
+ }
+
+ c.query = query
+
+ switch c.query.Table {
+ case "*":
+ c.Regex = fmt.Sprintf("\\|MAPREDUCE:\\|")
+ case ".":
+ c.Regex = "."
+ default:
+ c.Regex = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table)
+ }
+
+ c.globalGroup = mapr.NewGlobalGroupSet()
+ c.baseClient.init(c)
+
+ return &c, nil
+}
+
+func (c MaprClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection {
+ conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
+ conn.Handler = handlers.NewMaprHandler(conn.Server, c.query, c.globalGroup, c.PingTimeout)
+
+ conn.Commands = append(conn.Commands, fmt.Sprintf("map %s", c.query.RawQuery))
+ commandStr := "tail"
+ if c.additative {
+ commandStr = "cat"
+ }
+
+ for _, file := range strings.Split(c.Files, ",") {
+ conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", commandStr, file, c.Regex))
+ }
+
+ return conn
+}
+
+// Start starts the mapreduce client.
+func (c *MaprClient) Start(wg *sync.WaitGroup) (status int) {
+ defer wg.Done()
+
+ if c.query.Outfile == "" {
+ // Only print out periodic results if we don't write an outfile
+ go c.periodicPrintResults()
+ }
+
+ status = c.baseClient.Start(nil)
+ if c.additative {
+ c.recievedFinalResult()
+ }
+ c.baseClient.Stop()
+
+ return
+}
+
+func (c *MaprClient) recievedFinalResult() {
+ logger.Info("Received final mapreduce result")
+
+ if c.query.Outfile == "" {
+ c.printResults()
+ return
+ }
+
+ logger.Info(fmt.Sprintf("Writing final mapreduce result to '%s'", c.query.Outfile))
+ err := c.globalGroup.WriteResult(c.query)
+ if err != nil {
+ logger.FatalExit(err)
+ return
+ }
+ logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile))
+}
+
+func (c *MaprClient) periodicPrintResults() {
+ for {
+ select {
+ case <-time.After(c.query.Interval):
+ logger.Info("Gathering interim mapreduce result")
+ c.printResults()
+ case <-c.baseClient.stop:
+ return
+ }
+ }
+}
+
+func (c *MaprClient) printResults() {
+ var result string
+ var err error
+ var numLines int
+
+ if c.additative {
+ result, numLines, err = c.globalGroup.Result(c.query)
+ } else {
+ result, numLines, err = c.globalGroup.SwapOut().Result(c.query)
+ }
+ if err != nil {
+ logger.FatalExit(err)
+ }
+ if numLines > 0 {
+ logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery))
+ logger.Raw(result)
+ }
+}
diff --git a/clients/remote/connection.go b/clients/remote/connection.go
new file mode 100644
index 0000000..bd93239
--- /dev/null
+++ b/clients/remote/connection.go
@@ -0,0 +1,230 @@
+package remote
+
+import (
+ "dtail/clients/handlers"
+ "dtail/config"
+ "dtail/logger"
+ "dtail/ssh/client"
+ "fmt"
+ "io"
+ "strconv"
+ "strings"
+ "time"
+
+ "golang.org/x/crypto/ssh"
+)
+
+// Connection represents a client connection connection to a single server.
+type Connection struct {
+ // The remote server's hostname connected to.
+ Server string
+ // The remote server's port connected to.
+ port int
+ // The SSH client configuration used.
+ config *ssh.ClientConfig
+ // The SSH client handler to use.
+ Handler handlers.Handler
+ // DTail commands sent from client to server. When client loses
+ // connection to the server it re-connects automatically and sends the
+ // same commands again.
+ Commands []string
+ // Is it a persistent connection or a one-off?
+ isOneOff bool
+ // Used to stop the connection
+ stop chan struct{}
+ // To deal with SSH server host keys
+ hostKeyCallback *client.HostKeyCallback
+}
+
+// NewConnection returns a new connection.
+func NewConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *Connection {
+ logger.Debug(server, "Creating new connection")
+
+ c := Connection{
+ hostKeyCallback: hostKeyCallback,
+ config: &ssh.ClientConfig{
+ User: userName,
+ Auth: authMethods,
+ HostKeyCallback: hostKeyCallback.Wrap(),
+ Timeout: time.Second * 3,
+ },
+ stop: make(chan struct{}),
+ }
+
+ c.initServerPort(server)
+
+ return &c
+}
+
+// NewOneOffConnection creates new one-off connection (only for sending a series of commands and then quit).
+func NewOneOffConnection(server string, userName string, authMethods []ssh.AuthMethod) *Connection {
+ c := Connection{
+ config: &ssh.ClientConfig{
+ User: userName,
+ Auth: authMethods,
+ HostKeyCallback: ssh.InsecureIgnoreHostKey(),
+ },
+ stop: make(chan struct{}),
+ isOneOff: true,
+ }
+
+ c.initServerPort(server)
+
+ return &c
+}
+
+// Attempt to parse the server port address from the provided server FQDN.
+func (c *Connection) initServerPort(server string) {
+ c.Server = server
+ c.port = config.Common.SSHPort
+ parts := strings.Split(server, ":")
+
+ if len(parts) == 2 {
+ logger.Debug("Parsing port from hostname", parts)
+ port, err := strconv.Atoi(parts[1])
+ if err != nil {
+ logger.FatalExit("Unable to parse client port", server, parts, err)
+ }
+ c.Server = parts[0]
+ c.port = port
+ }
+}
+
+// Start the server connection. Build up SSH session and send some DTail commandc.
+func (c *Connection) Start(throttleCh, statsCh chan struct{}) {
+ select {
+ case <-c.stop:
+ logger.Info(c.Server, c.port, "Disconnecting client")
+ return
+ default:
+ }
+
+ // Wait for SSH connection throttler
+ throttleCh <- struct{}{}
+
+ // Wait until connection has been initiated or an error occured
+ // during initialization.
+ throttleStopCh := make(chan struct{}, 2)
+ go func() {
+ <-throttleStopCh
+ <-throttleCh
+ }()
+
+ if err := c.dial(c.Server, c.port, throttleStopCh, statsCh); err != nil {
+ logger.Warn(c.Server, c.port, err)
+ throttleStopCh <- struct{}{}
+
+ if c.hostKeyCallback.Untrusted(fmt.Sprintf("%s:%d", c.Server, c.port)) {
+ logger.Debug("Not trusting host, not trying to re-connect", c.Server, c.port)
+ return
+ }
+ }
+}
+
+// Dail into a new SSH connection. Close connection in case of an error.
+func (c *Connection) dial(host string, port int, throttleStopCh, statsCh chan struct{}) error {
+ statsCh <- struct{}{}
+ defer func() { <-statsCh }()
+
+ logger.Debug(host, "dial")
+ address := fmt.Sprintf("%s:%d", host, port)
+
+ client, err := ssh.Dial("tcp", address, c.config)
+ if err != nil {
+ return err
+ }
+ defer client.Close()
+
+ return c.session(client, throttleStopCh)
+}
+
+// Create the SSH session. Close the session in case of an error.
+func (c *Connection) session(client *ssh.Client, throttleStopCh chan<- struct{}) error {
+ logger.Debug(c.Server, "session")
+
+ session, err := client.NewSession()
+ if err != nil {
+ return err
+ }
+ defer session.Close()
+
+ return c.handle(session, throttleStopCh)
+}
+
+// Handle the SSH session. Also send periodic pings to the server in order
+// to determine that session is still intact.
+func (c *Connection) handle(session *ssh.Session, throttleStopCh chan<- struct{}) error {
+ defer c.Handler.Stop()
+
+ logger.Debug(c.Server, "handle")
+
+ stdinPipe, err := session.StdinPipe()
+ if err != nil {
+ return err
+ }
+
+ stdoutPipe, err := session.StdoutPipe()
+ if err != nil {
+ return err
+ }
+
+ if err := session.Shell(); err != nil {
+ return err
+ }
+
+ // Establish Bi-directional pipe between SSH session and client handler.
+ brokenStdinPipe := make(chan struct{})
+ go func() {
+ defer close(brokenStdinPipe)
+ io.Copy(stdinPipe, c.Handler)
+ }()
+
+ brokenStdoutPipe := make(chan struct{})
+ go func() {
+ defer close(brokenStdoutPipe)
+ io.Copy(c.Handler, stdoutPipe)
+ }()
+
+ // SSH session established, other goroutine can initiate session now.
+ throttleStopCh <- struct{}{}
+
+ // Send all commands to client.
+ for _, command := range c.Commands {
+ logger.Debug(command)
+ c.Handler.SendCommand(command)
+ }
+
+ if !c.isOneOff {
+ return c.periodicAliveCheck(brokenStdinPipe, brokenStdoutPipe)
+ }
+
+ <-c.stop
+
+ // Normal shutdown, all fine
+ return nil
+}
+
+// Periodically check whether connection is still alive or not.
+func (c *Connection) periodicAliveCheck(brokenStdinPipe, brokenStdoutPipe <-chan struct{}) error {
+ for {
+ select {
+ case <-time.After(time.Second * 3):
+ if err := c.Handler.Ping(); err != nil {
+ return err
+ }
+ case <-brokenStdinPipe:
+ logger.Debug("Broken stdin pipe", c.Server, c.port)
+ return nil
+ case <-brokenStdoutPipe:
+ logger.Debug("Broken stdout pipe", c.Server, c.port)
+ return nil
+ case <-c.stop:
+ return nil
+ }
+ }
+}
+
+// Stop the connection.
+func (c *Connection) Stop() {
+ close(c.stop)
+}
diff --git a/clients/stats.go b/clients/stats.go
new file mode 100644
index 0000000..e5b9bed
--- /dev/null
+++ b/clients/stats.go
@@ -0,0 +1,81 @@
+package clients
+
+import (
+ "dtail/logger"
+ "fmt"
+ "runtime"
+ "sync"
+ "time"
+)
+
+// Used to collect and display various client stats.
+type stats struct {
+ // Total amount servers to connect to.
+ connectionsTotal int
+ // To keep track of what connected and disconnected
+ connectionsEstCh chan struct{}
+ // Amount of servers connections are established.
+ connected int
+ // To synchronize concurrent access.
+ mutex sync.Mutex
+}
+
+func newTailStats(connectionsTotal int) *stats {
+ return &stats{
+ connectionsTotal: connectionsTotal,
+ connectionsEstCh: make(chan struct{}, connectionsTotal),
+ connected: 0,
+ }
+}
+
+func (s *stats) periodicLogStats(throttleCh chan struct{}, stop <-chan struct{}) {
+ connectedLast := 0
+ statsInterval := 5
+
+ for {
+ select {
+ case <-time.After(time.Second * time.Duration(statsInterval)):
+ case <-stop:
+ return
+ }
+
+ connected := len(s.connectionsEstCh)
+ throttle := len(throttleCh)
+
+ newConnections := connected - connectedLast
+ connectionsPerSecond := float64(newConnections) / float64(statsInterval)
+ s.log(connected, newConnections, connectionsPerSecond, throttle)
+
+ connectedLast = connected
+
+ s.mutex.Lock()
+ s.connected = connected
+ s.mutex.Unlock()
+ }
+}
+
+func (s *stats) numConnected() int {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ return s.connected
+}
+
+func (s *stats) log(connected, newConnections int, connectionsPerSecond float64, throttle int) {
+ percConnected := percentOf(float64(s.connectionsTotal), float64(connected))
+
+ connectedStr := fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected))
+ newConnStr := fmt.Sprintf("new=%d", newConnections)
+ rateStr := fmt.Sprintf("rate=%2.2f/s", connectionsPerSecond)
+ throttleStr := fmt.Sprintf("throttle=%d", throttle)
+ cpusGoroutinesStr := fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine())
+
+ logger.Info("stats", connectedStr, newConnStr, rateStr, throttleStr, cpusGoroutinesStr)
+}
+
+func percentOf(total float64, value float64) float64 {
+ if total == 0 || total == value {
+ return 100
+ }
+ return value / (total / 100.0)
+}
diff --git a/clients/tailclient.go b/clients/tailclient.go
new file mode 100644
index 0000000..cb93258
--- /dev/null
+++ b/clients/tailclient.go
@@ -0,0 +1,44 @@
+package clients
+
+import (
+ "dtail/clients/handlers"
+ "dtail/clients/remote"
+ "dtail/ssh/client"
+ "fmt"
+ "strings"
+
+ gossh "golang.org/x/crypto/ssh"
+)
+
+// TailClient is used for tailing remote log files (opening, seeking to the end and returning only new incoming lines).
+type TailClient struct {
+ baseClient
+}
+
+// NewTailClient returns a new TailClient.
+func NewTailClient(args Args) (*TailClient, error) {
+ c := TailClient{
+ baseClient: baseClient{
+ Args: args,
+ stop: make(chan struct{}),
+ stopped: make(chan struct{}),
+ throttleCh: make(chan struct{}, args.MaxInitConnections),
+ retry: true,
+ },
+ }
+
+ c.init(c)
+
+ return &c, nil
+}
+
+func (c TailClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection {
+ conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
+ conn.Handler = handlers.NewClientHandler(server, c.PingTimeout)
+
+ for _, file := range strings.Split(c.Files, ",") {
+ conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex))
+ }
+
+ return conn
+}