diff options
Diffstat (limited to 'clients/maprclient.go')
| -rw-r--r-- | clients/maprclient.go | 153 |
1 files changed, 153 insertions, 0 deletions
diff --git a/clients/maprclient.go b/clients/maprclient.go new file mode 100644 index 0000000..ad707c9 --- /dev/null +++ b/clients/maprclient.go @@ -0,0 +1,153 @@ +package clients + +import ( + "dtail/clients/handlers" + "dtail/clients/remote" + "dtail/logger" + "dtail/mapr" + "dtail/omode" + "dtail/ssh/client" + "errors" + "fmt" + "strings" + "sync" + "time" + + gossh "golang.org/x/crypto/ssh" +) + +// MaprClient is used for running mapreduce aggregations on remote files. +type MaprClient struct { + baseClient + // Query string for mapr aggregations + queryStr string + // Global group set for merged mapr aggregation results + globalGroup *mapr.GlobalGroupSet + // The query object (constructed from queryStr) + query *mapr.Query + // Additative result or new result every run? + additative bool +} + +// NewMaprClient returns a new mapreduce client. +func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { + if queryStr == "" { + return nil, errors.New("No mapreduce query specified, use '-query' flag") + } + + c := MaprClient{ + baseClient: baseClient{ + Args: args, + stop: make(chan struct{}), + stopped: make(chan struct{}), + throttleCh: make(chan struct{}, args.MaxInitConnections), + retry: args.Mode == omode.TailClient, + }, + queryStr: queryStr, + additative: args.Mode == omode.MapClient, + } + + query, err := mapr.NewQuery(c.queryStr) + if err != nil { + logger.FatalExit(c.queryStr, "Can't parse mapr query", err) + } + + c.query = query + + switch c.query.Table { + case "*": + c.Regex = fmt.Sprintf("\\|MAPREDUCE:\\|") + case ".": + c.Regex = "." + default: + c.Regex = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table) + } + + c.globalGroup = mapr.NewGlobalGroupSet() + c.baseClient.init(c) + + return &c, nil +} + +func (c MaprClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection { + conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) + conn.Handler = handlers.NewMaprHandler(conn.Server, c.query, c.globalGroup, c.PingTimeout) + + conn.Commands = append(conn.Commands, fmt.Sprintf("map %s", c.query.RawQuery)) + commandStr := "tail" + if c.additative { + commandStr = "cat" + } + + for _, file := range strings.Split(c.Files, ",") { + conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", commandStr, file, c.Regex)) + } + + return conn +} + +// Start starts the mapreduce client. +func (c *MaprClient) Start(wg *sync.WaitGroup) (status int) { + defer wg.Done() + + if c.query.Outfile == "" { + // Only print out periodic results if we don't write an outfile + go c.periodicPrintResults() + } + + status = c.baseClient.Start(nil) + if c.additative { + c.recievedFinalResult() + } + c.baseClient.Stop() + + return +} + +func (c *MaprClient) recievedFinalResult() { + logger.Info("Received final mapreduce result") + + if c.query.Outfile == "" { + c.printResults() + return + } + + logger.Info(fmt.Sprintf("Writing final mapreduce result to '%s'", c.query.Outfile)) + err := c.globalGroup.WriteResult(c.query) + if err != nil { + logger.FatalExit(err) + return + } + logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile)) +} + +func (c *MaprClient) periodicPrintResults() { + for { + select { + case <-time.After(c.query.Interval): + logger.Info("Gathering interim mapreduce result") + c.printResults() + case <-c.baseClient.stop: + return + } + } +} + +func (c *MaprClient) printResults() { + var result string + var err error + var numLines int + + if c.additative { + result, numLines, err = c.globalGroup.Result(c.query) + } else { + result, numLines, err = c.globalGroup.SwapOut().Result(c.query) + } + if err != nil { + logger.FatalExit(err) + } + if numLines > 0 { + logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) + logger.Raw(result) + } +} |
