summaryrefslogtreecommitdiff
path: root/clients/maprclient.go
diff options
context:
space:
mode:
Diffstat (limited to 'clients/maprclient.go')
-rw-r--r--clients/maprclient.go153
1 files changed, 153 insertions, 0 deletions
diff --git a/clients/maprclient.go b/clients/maprclient.go
new file mode 100644
index 0000000..ad707c9
--- /dev/null
+++ b/clients/maprclient.go
@@ -0,0 +1,153 @@
+package clients
+
+import (
+ "dtail/clients/handlers"
+ "dtail/clients/remote"
+ "dtail/logger"
+ "dtail/mapr"
+ "dtail/omode"
+ "dtail/ssh/client"
+ "errors"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ gossh "golang.org/x/crypto/ssh"
+)
+
+// MaprClient is used for running mapreduce aggregations on remote files.
+type MaprClient struct {
+ baseClient
+ // Query string for mapr aggregations
+ queryStr string
+ // Global group set for merged mapr aggregation results
+ globalGroup *mapr.GlobalGroupSet
+ // The query object (constructed from queryStr)
+ query *mapr.Query
+ // Additative result or new result every run?
+ additative bool
+}
+
+// NewMaprClient returns a new mapreduce client.
+func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
+ if queryStr == "" {
+ return nil, errors.New("No mapreduce query specified, use '-query' flag")
+ }
+
+ c := MaprClient{
+ baseClient: baseClient{
+ Args: args,
+ stop: make(chan struct{}),
+ stopped: make(chan struct{}),
+ throttleCh: make(chan struct{}, args.MaxInitConnections),
+ retry: args.Mode == omode.TailClient,
+ },
+ queryStr: queryStr,
+ additative: args.Mode == omode.MapClient,
+ }
+
+ query, err := mapr.NewQuery(c.queryStr)
+ if err != nil {
+ logger.FatalExit(c.queryStr, "Can't parse mapr query", err)
+ }
+
+ c.query = query
+
+ switch c.query.Table {
+ case "*":
+ c.Regex = fmt.Sprintf("\\|MAPREDUCE:\\|")
+ case ".":
+ c.Regex = "."
+ default:
+ c.Regex = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table)
+ }
+
+ c.globalGroup = mapr.NewGlobalGroupSet()
+ c.baseClient.init(c)
+
+ return &c, nil
+}
+
+func (c MaprClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection {
+ conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
+ conn.Handler = handlers.NewMaprHandler(conn.Server, c.query, c.globalGroup, c.PingTimeout)
+
+ conn.Commands = append(conn.Commands, fmt.Sprintf("map %s", c.query.RawQuery))
+ commandStr := "tail"
+ if c.additative {
+ commandStr = "cat"
+ }
+
+ for _, file := range strings.Split(c.Files, ",") {
+ conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", commandStr, file, c.Regex))
+ }
+
+ return conn
+}
+
+// Start starts the mapreduce client.
+func (c *MaprClient) Start(wg *sync.WaitGroup) (status int) {
+ defer wg.Done()
+
+ if c.query.Outfile == "" {
+ // Only print out periodic results if we don't write an outfile
+ go c.periodicPrintResults()
+ }
+
+ status = c.baseClient.Start(nil)
+ if c.additative {
+ c.recievedFinalResult()
+ }
+ c.baseClient.Stop()
+
+ return
+}
+
+func (c *MaprClient) recievedFinalResult() {
+ logger.Info("Received final mapreduce result")
+
+ if c.query.Outfile == "" {
+ c.printResults()
+ return
+ }
+
+ logger.Info(fmt.Sprintf("Writing final mapreduce result to '%s'", c.query.Outfile))
+ err := c.globalGroup.WriteResult(c.query)
+ if err != nil {
+ logger.FatalExit(err)
+ return
+ }
+ logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile))
+}
+
+func (c *MaprClient) periodicPrintResults() {
+ for {
+ select {
+ case <-time.After(c.query.Interval):
+ logger.Info("Gathering interim mapreduce result")
+ c.printResults()
+ case <-c.baseClient.stop:
+ return
+ }
+ }
+}
+
+func (c *MaprClient) printResults() {
+ var result string
+ var err error
+ var numLines int
+
+ if c.additative {
+ result, numLines, err = c.globalGroup.Result(c.query)
+ } else {
+ result, numLines, err = c.globalGroup.SwapOut().Result(c.query)
+ }
+ if err != nil {
+ logger.FatalExit(err)
+ }
+ if numLines > 0 {
+ logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery))
+ logger.Raw(result)
+ }
+}