summaryrefslogtreecommitdiff
path: root/internal/clients/maprclient.go
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-01-26 11:26:53 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-02-07 13:31:15 +0000
commit0945da8dfefcbb723eecea0e5f4eafff63398253 (patch)
treef06dab4d2bf21d25d176b23d5baeca588d27f5d7 /internal/clients/maprclient.go
parent2a8e5de265a0e0a31a5834909d6879f5c9941467 (diff)
Introduce drun command, refactor code to use context package
Diffstat (limited to 'internal/clients/maprclient.go')
-rw-r--r--internal/clients/maprclient.go52
1 files changed, 24 insertions, 28 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 9070827..b581844 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -1,6 +1,7 @@
package clients
import (
+ "context"
"errors"
"fmt"
"runtime"
@@ -8,13 +9,9 @@ import (
"time"
"github.com/mimecast/dtail/internal/clients/handlers"
- "github.com/mimecast/dtail/internal/clients/remote"
- "github.com/mimecast/dtail/internal/logger"
+ "github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/omode"
- "github.com/mimecast/dtail/internal/ssh/client"
-
- gossh "golang.org/x/crypto/ssh"
)
// MaprClient is used for running mapreduce aggregations on remote files.
@@ -39,8 +36,6 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
c := MaprClient{
baseClient: baseClient{
Args: args,
- stop: make(chan struct{}),
- stopped: make(chan struct{}),
throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()),
retry: args.Mode == omode.TailClient,
},
@@ -70,35 +65,36 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
return &c, nil
}
-func (c MaprClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection {
- conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
- conn.Handler = handlers.NewMaprHandler(conn.Server, c.query, c.globalGroup, c.PingTimeout)
+// Start starts the mapreduce client.
+func (c *MaprClient) Start(ctx context.Context) (status int) {
+ if c.query.Outfile == "" {
+ // Only print out periodic results if we don't write an outfile
+ go c.periodicPrintResults(ctx)
+ }
- conn.Commands = append(conn.Commands, fmt.Sprintf("map %s", c.query.RawQuery))
- commandStr := "tail"
+ status = c.baseClient.Start(ctx)
if c.additative {
- commandStr = "cat"
+ c.recievedFinalResult()
}
- for _, file := range strings.Split(c.Files, ",") {
- conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", commandStr, file, c.Regex))
- }
+ return
+}
- return conn
+func (c MaprClient) makeHandler(server string) handlers.Handler {
+ return handlers.NewMaprHandler(server, c.query, c.globalGroup)
}
-// Start starts the mapreduce client.
-func (c *MaprClient) Start() (status int) {
- if c.query.Outfile == "" {
- // Only print out periodic results if we don't write an outfile
- go c.periodicPrintResults()
- }
+func (c MaprClient) makeCommands() (commands []string) {
+ commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery))
- status = c.baseClient.Start()
+ modeStr := "tail"
if c.additative {
- c.recievedFinalResult()
+ modeStr = "cat"
+ }
+
+ for _, file := range strings.Split(c.What, ",") {
+ commands = append(commands, fmt.Sprintf("%s %s regex %s", modeStr, file, c.Regex))
}
- c.baseClient.Stop()
return
}
@@ -120,13 +116,13 @@ func (c *MaprClient) recievedFinalResult() {
logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile))
}
-func (c *MaprClient) periodicPrintResults() {
+func (c *MaprClient) periodicPrintResults(ctx context.Context) {
for {
select {
case <-time.After(c.query.Interval):
logger.Info("Gathering interim mapreduce result")
c.printResults()
- case <-c.baseClient.stop:
+ case <-ctx.Done():
return
}
}