diff options
| author | Paul Bütow <pbuetow@mimecast.com> | 2020-01-26 11:26:53 +0000 |
|---|---|---|
| committer | Paul Bütow <pbuetow@mimecast.com> | 2020-02-07 13:31:15 +0000 |
| commit | 0945da8dfefcbb723eecea0e5f4eafff63398253 (patch) | |
| tree | f06dab4d2bf21d25d176b23d5baeca588d27f5d7 /internal/clients/maprclient.go | |
| parent | 2a8e5de265a0e0a31a5834909d6879f5c9941467 (diff) | |
Introduce drun command, refactor code to use context package
Diffstat (limited to 'internal/clients/maprclient.go')
| -rw-r--r-- | internal/clients/maprclient.go | 52 |
1 files changed, 24 insertions, 28 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 9070827..b581844 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -1,6 +1,7 @@ package clients import ( + "context" "errors" "fmt" "runtime" @@ -8,13 +9,9 @@ import ( "time" "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/clients/remote" - "github.com/mimecast/dtail/internal/logger" + "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/omode" - "github.com/mimecast/dtail/internal/ssh/client" - - gossh "golang.org/x/crypto/ssh" ) // MaprClient is used for running mapreduce aggregations on remote files. @@ -39,8 +36,6 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { c := MaprClient{ baseClient: baseClient{ Args: args, - stop: make(chan struct{}), - stopped: make(chan struct{}), throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), retry: args.Mode == omode.TailClient, }, @@ -70,35 +65,36 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { return &c, nil } -func (c MaprClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection { - conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) - conn.Handler = handlers.NewMaprHandler(conn.Server, c.query, c.globalGroup, c.PingTimeout) +// Start starts the mapreduce client. +func (c *MaprClient) Start(ctx context.Context) (status int) { + if c.query.Outfile == "" { + // Only print out periodic results if we don't write an outfile + go c.periodicPrintResults(ctx) + } - conn.Commands = append(conn.Commands, fmt.Sprintf("map %s", c.query.RawQuery)) - commandStr := "tail" + status = c.baseClient.Start(ctx) if c.additative { - commandStr = "cat" + c.recievedFinalResult() } - for _, file := range strings.Split(c.Files, ",") { - conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", commandStr, file, c.Regex)) - } + return +} - return conn +func (c MaprClient) makeHandler(server string) handlers.Handler { + return handlers.NewMaprHandler(server, c.query, c.globalGroup) } -// Start starts the mapreduce client. -func (c *MaprClient) Start() (status int) { - if c.query.Outfile == "" { - // Only print out periodic results if we don't write an outfile - go c.periodicPrintResults() - } +func (c MaprClient) makeCommands() (commands []string) { + commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) - status = c.baseClient.Start() + modeStr := "tail" if c.additative { - c.recievedFinalResult() + modeStr = "cat" + } + + for _, file := range strings.Split(c.What, ",") { + commands = append(commands, fmt.Sprintf("%s %s regex %s", modeStr, file, c.Regex)) } - c.baseClient.Stop() return } @@ -120,13 +116,13 @@ func (c *MaprClient) recievedFinalResult() { logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile)) } -func (c *MaprClient) periodicPrintResults() { +func (c *MaprClient) periodicPrintResults(ctx context.Context) { for { select { case <-time.After(c.query.Interval): logger.Info("Gathering interim mapreduce result") c.printResults() - case <-c.baseClient.stop: + case <-ctx.Done(): return } } |
