summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/handlers/maprhandler.go8
-rw-r--r--internal/clients/maker.go4
-rw-r--r--internal/clients/maprclient.go209
-rw-r--r--internal/clients/maprclient_test.go104
4 files changed, 239 insertions, 86 deletions
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index 5a16d13..d4e171c 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -5,7 +5,6 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/dlog"
- "github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/client"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -15,13 +14,11 @@ import (
type MaprHandler struct {
baseHandler
aggregate *client.Aggregate
- query *mapr.Query
removedNl bool
}
// NewMaprHandler returns a new mapreduce client handler.
-func NewMaprHandler(server string, query *mapr.Query,
- globalGroup *mapr.GlobalGroupSet) *MaprHandler {
+func NewMaprHandler(server string, session *client.SessionState) *MaprHandler {
return &MaprHandler{
baseHandler: baseHandler{
@@ -34,8 +31,7 @@ func NewMaprHandler(server string, query *mapr.Query,
capabilitiesCh: make(chan struct{}),
sessionAcks: make(chan SessionAck, 4),
},
- query: query,
- aggregate: client.NewAggregate(server, query, globalGroup),
+ aggregate: client.NewAggregate(server, session),
}
}
diff --git a/internal/clients/maker.go b/internal/clients/maker.go
index 6d75fc1..81ced09 100644
--- a/internal/clients/maker.go
+++ b/internal/clients/maker.go
@@ -15,3 +15,7 @@ type maker interface {
type sessionSpecMaker interface {
makeSessionSpec() (SessionSpec, error)
}
+
+type sessionCommitter interface {
+ commitSessionSpec(spec SessionSpec, generation uint64) error
+}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 4bf0cf0..68770ec 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -11,6 +11,7 @@ import (
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
+ maprclient "github.com/mimecast/dtail/internal/mapr/client"
"github.com/mimecast/dtail/internal/omode"
)
@@ -29,14 +30,10 @@ const (
// MaprClient is used for running mapreduce aggregations on remote files.
type MaprClient struct {
baseClient
- // Global group set for merged mapr aggregation results
- globalGroup *mapr.GlobalGroupSet
- // The query object (constructed from queryStr)
- query *mapr.Query
- // Additative result or new result every interval run?
- cumulative bool
- // The last result string received
- lastResult string
+ // Shared mapreduce state for all handlers and reporting paths.
+ session *maprclient.SessionState
+ // Selected cumulative reporting mode.
+ mode MaprClientMode
}
// NewMaprClient returns a new mapreduce client.
@@ -53,19 +50,6 @@ func NewMaprClient(args config.Args, maprClientMode MaprClientMode) (*MaprClient
// Don't retry connection if in tail mode and no outfile specified.
retry := args.Mode == omode.TailClient && !query.HasOutfile()
- var cumulative bool
- switch maprClientMode {
- case CumulativeMode:
- cumulative = true
- case NonCumulativeMode:
- cumulative = false
- default:
- // Result is comulative if we are in MapClient mode or with outfile
- cumulative = args.Mode == omode.MapClient || query.HasOutfile()
- }
-
- dlog.Client.Debug("Cumulative mapreduce mode?", cumulative)
-
c := MaprClient{
baseClient: baseClient{
Args: args,
@@ -73,22 +57,14 @@ func NewMaprClient(args config.Args, maprClientMode MaprClientMode) (*MaprClient
retry: retry,
runtime: newClientRuntimeBoundary(config.CurrentRuntime()),
},
- query: query,
- cumulative: cumulative,
- }
-
- switch c.query.Table {
- case "", ".":
- c.RegexStr = "."
- case "*":
- c.RegexStr = "\\|MAPREDUCE:\\|"
- default:
- c.RegexStr = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table)
+ session: maprclient.NewSessionState(query),
+ mode: maprClientMode,
}
+ dlog.Client.Debug("Cumulative mapreduce mode?", c.isCumulative(query))
- c.globalGroup = mapr.NewGlobalGroupSet()
+ c.setRegexForQuery(query)
c.baseClient.init()
- c.baseClient.makeConnections(c)
+ c.baseClient.makeConnections(&c)
return &c, nil
}
@@ -100,7 +76,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
status = c.baseClient.Start(ctx, statsCh)
// Always write final result for cumulative mode (includes outfile case)
- if c.cumulative {
+ if snapshot := c.session.Snapshot(); c.isCumulative(snapshot.Query) {
dlog.Client.Debug("Writing final mapreduce result")
if err := c.reportResults(true); err != nil {
dlog.Client.Error("Unable to write final mapreduce result", err)
@@ -113,17 +89,19 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
// NEXT: Make this a callback function rather trying to use polymorphism to call
// this. This applies to all clients. It will make the code easier to read.
-func (c MaprClient) makeHandler(server string) handlers.Handler {
- return handlers.NewMaprHandler(server, c.query, c.globalGroup)
+func (c *MaprClient) makeHandler(server string) handlers.Handler {
+ return handlers.NewMaprHandler(server, c.session)
}
-func (c MaprClient) makeSessionSpec() (SessionSpec, error) {
+func (c *MaprClient) makeSessionSpec() (SessionSpec, error) {
sessionSpec := NewSessionSpec(c.Args)
- sessionSpec.Query = c.query.RawQuery
+ if snapshot := c.session.Snapshot(); snapshot.Query != nil {
+ sessionSpec.Query = snapshot.Query.RawQuery
+ }
return sessionSpec, nil
}
-func (c MaprClient) makeCommands() (commands []string) {
+func (c *MaprClient) makeCommands() (commands []string) {
sessionSpec, err := c.makeSessionSpec()
if err != nil {
dlog.Client.FatalPanic("unable to build map session spec", err)
@@ -136,84 +114,96 @@ func (c MaprClient) makeCommands() (commands []string) {
}
func (c *MaprClient) periodicReportResults(ctx context.Context) {
- rampUpSleep := c.query.Interval / 2
- dlog.Client.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep)
+ var (
+ lastGeneration uint64
+ seenGeneration bool
+ )
- if rampUpSleep > 0 {
- rampUpTimer := time.NewTimer(rampUpSleep)
- select {
- case <-rampUpTimer.C:
- case <-ctx.Done():
- if !rampUpTimer.Stop() {
- select {
- case <-rampUpTimer.C:
- default:
- }
- }
- return
- }
- }
+ for {
+ snapshot := c.session.Snapshot()
+ rampUp := !seenGeneration || snapshot.Generation != lastGeneration
+ lastGeneration = snapshot.Generation
+ seenGeneration = true
- interval := c.query.Interval
- if interval <= 0 {
- interval = time.Second
- }
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
+ delay := c.reportDelay(snapshot.Query, rampUp)
+ dlog.Client.Debug("Sleeping before processing mapreduce results", "generation", snapshot.Generation, "delay", delay)
- for {
+ timer := time.NewTimer(delay)
select {
- case <-ticker.C:
+ case <-timer.C:
dlog.Client.Debug("Gathering interim mapreduce result")
if err := c.reportResults(false); err != nil {
dlog.Client.Error("Unable to gather mapreduce result", err)
}
+ case <-c.session.Changes():
+ if !timer.Stop() {
+ select {
+ case <-timer.C:
+ default:
+ }
+ }
+ dlog.Client.Debug("Mapreduce query generation changed, recalculating report interval")
case <-ctx.Done():
+ if !timer.Stop() {
+ select {
+ case <-timer.C:
+ default:
+ }
+ }
return
}
}
}
func (c *MaprClient) reportResults(finalResult bool) error {
- if c.query.HasOutfile() {
- return c.writeResultsToOutfile(finalResult)
+ snapshot := c.session.Snapshot()
+ if snapshot.Query == nil || snapshot.GlobalGroup == nil {
+ return nil
}
- return c.printResults()
+
+ if snapshot.Query.HasOutfile() {
+ return c.writeResultsToOutfile(snapshot, finalResult)
+ }
+ return c.printResults(snapshot)
}
-func (c *MaprClient) printResults() error {
+func (c *MaprClient) printResults(snapshot maprclient.SessionSnapshot) error {
var result string
var err error
var numRows int
rowsLimit := -1
- if c.query.Limit == -1 {
+ if snapshot.Query.Limit == -1 {
// Limit output to 10 rows when the result is printed to stdout.
// This can be overriden with the limit clause though.
rowsLimit = 10
}
- if c.cumulative {
- result, numRows, err = c.globalGroup.Result(c.query, rowsLimit, c.runtime.output.MaprResultRenderer())
+ if c.isCumulative(snapshot.Query) {
+ result, numRows, err = snapshot.GlobalGroup.Result(snapshot.Query, rowsLimit, c.runtime.output.MaprResultRenderer())
} else {
- result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit, c.runtime.output.MaprResultRenderer())
+ result, numRows, err = snapshot.GlobalGroup.SwapOut().Result(snapshot.Query, rowsLimit, c.runtime.output.MaprResultRenderer())
}
if err != nil {
return fmt.Errorf("unable to render mapreduce result: %w", err)
}
- if result == c.lastResult {
+ changed, ok := c.session.CommitRenderedResult(snapshot.Generation, result)
+ if !ok {
+ dlog.Client.Debug("Discarding stale mapreduce result", "generation", snapshot.Generation)
+ return nil
+ }
+ if !changed {
dlog.Client.Debug("Result hasn't changed compared to last time...")
return nil
}
- c.lastResult = result
if numRows == 0 {
dlog.Client.Debug("Empty result set this time...")
return nil
}
- rawQuery := c.runtime.output.PaintMaprRawQuery(c.query.RawQuery)
+ rawQuery := c.runtime.output.PaintMaprRawQuery(snapshot.Query.RawQuery)
dlog.Client.Raw(fmt.Sprintf("%s\n", rawQuery))
if rowsLimit > 0 && numRows > rowsLimit {
@@ -224,18 +214,77 @@ func (c *MaprClient) printResults() error {
return nil
}
-func (c *MaprClient) writeResultsToOutfile(finalResult bool) error {
- dlog.Client.Debug("writeResultsToOutfile called", "finalResult", finalResult, "cumulative", c.cumulative)
- if c.cumulative {
- if err := c.globalGroup.WriteResult(c.query, finalResult); err != nil {
+func (c *MaprClient) writeResultsToOutfile(snapshot maprclient.SessionSnapshot, finalResult bool) error {
+ cumulative := c.isCumulative(snapshot.Query)
+ dlog.Client.Debug("writeResultsToOutfile called", "finalResult", finalResult, "cumulative", cumulative, "generation", snapshot.Generation)
+ if cumulative {
+ if err := snapshot.GlobalGroup.WriteResult(snapshot.Query, finalResult); err != nil {
return fmt.Errorf("unable to write cumulative mapreduce result: %w", err)
}
dlog.Client.Debug("WriteResult completed for cumulative mode")
return nil
}
- if err := c.globalGroup.SwapOut().WriteResult(c.query, true); err != nil {
+ if err := snapshot.GlobalGroup.SwapOut().WriteResult(snapshot.Query, true); err != nil {
return fmt.Errorf("unable to write non-cumulative mapreduce result: %w", err)
}
dlog.Client.Debug("WriteResult completed for non-cumulative mode")
return nil
}
+
+func (c *MaprClient) commitSessionSpec(spec SessionSpec, generation uint64) error {
+ if spec.Query == "" {
+ return errors.New("missing mapreduce query")
+ }
+
+ query, err := c.session.CommitQuery(spec.Query, generation)
+ if err != nil {
+ return err
+ }
+
+ c.Args.QueryStr = spec.Query
+ c.setRegexForQuery(query)
+ return nil
+}
+
+func (c *MaprClient) isCumulative(query *mapr.Query) bool {
+ switch c.mode {
+ case CumulativeMode:
+ return true
+ case NonCumulativeMode:
+ return false
+ default:
+ return c.Args.Mode == omode.MapClient || (query != nil && query.HasOutfile())
+ }
+}
+
+func (c *MaprClient) setRegexForQuery(query *mapr.Query) {
+ if query == nil {
+ c.RegexStr = "."
+ return
+ }
+
+ switch query.Table {
+ case "", ".":
+ c.RegexStr = "."
+ case "*":
+ c.RegexStr = "\\|MAPREDUCE:\\|"
+ default:
+ c.RegexStr = fmt.Sprintf("\\|MAPREDUCE:%s\\|", query.Table)
+ }
+}
+
+func (c *MaprClient) reportDelay(query *mapr.Query, rampUp bool) time.Duration {
+ interval := time.Second
+ if query != nil && query.Interval > 0 {
+ interval = query.Interval
+ }
+ if !rampUp {
+ return interval
+ }
+
+ delay := interval / 2
+ if delay <= 0 {
+ return interval
+ }
+ return delay
+}
diff --git a/internal/clients/maprclient_test.go b/internal/clients/maprclient_test.go
new file mode 100644
index 0000000..641a74a
--- /dev/null
+++ b/internal/clients/maprclient_test.go
@@ -0,0 +1,104 @@
+package clients
+
+import (
+ "testing"
+ "time"
+
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/mapr"
+ maprclient "github.com/mimecast/dtail/internal/mapr/client"
+ "github.com/mimecast/dtail/internal/omode"
+)
+
+func TestMaprClientCommitSessionSpecResetsSharedState(t *testing.T) {
+ query := mustMaprClientQuery(t, "select count(status) from stats group by status")
+ client := &MaprClient{
+ baseClient: baseClient{
+ Args: config.Args{Mode: omode.MapClient},
+ },
+ session: maprclient.NewSessionState(query),
+ mode: DefaultMode,
+ }
+ client.setRegexForQuery(query)
+
+ initial := client.session.Snapshot()
+ group := mapr.NewGroupSet()
+ set := group.GetSet("ERROR")
+ set.Samples = 1
+ set.FValues[query.Select[0].FieldStorage] = 1
+ if err := initial.GlobalGroup.Merge(query, group); err != nil {
+ t.Fatalf("Merge() error = %v", err)
+ }
+ if changed, ok := client.session.CommitRenderedResult(initial.Generation, "old-result"); !ok || !changed {
+ t.Fatalf("CommitRenderedResult() = changed:%v ok:%v, want changed and ok", changed, ok)
+ }
+
+ spec := SessionSpec{
+ Query: "select count(status) from warnings group by status",
+ }
+ if err := client.commitSessionSpec(spec, 4); err != nil {
+ t.Fatalf("commitSessionSpec() error = %v", err)
+ }
+
+ updated := client.session.Snapshot()
+ if updated.Generation != 4 {
+ t.Fatalf("generation = %d, want 4", updated.Generation)
+ }
+ if updated.Query == nil || updated.Query.RawQuery != spec.Query {
+ t.Fatalf("unexpected query after commit: %#v", updated.Query)
+ }
+ if !updated.GlobalGroup.IsEmpty() {
+ t.Fatalf("expected committed global group to be reset")
+ }
+ if updated.LastResult != "" {
+ t.Fatalf("last result = %q, want empty", updated.LastResult)
+ }
+ if client.RegexStr != "\\|MAPREDUCE:WARNINGS\\|" {
+ t.Fatalf("RegexStr = %q, want WARNINGS table regex", client.RegexStr)
+ }
+
+ sessionSpec, err := client.makeSessionSpec()
+ if err != nil {
+ t.Fatalf("makeSessionSpec() error = %v", err)
+ }
+ if sessionSpec.Query != spec.Query {
+ t.Fatalf("session spec query = %q, want %q", sessionSpec.Query, spec.Query)
+ }
+}
+
+func TestMaprClientCommitSessionSpecRejectsMissingQuery(t *testing.T) {
+ query := mustMaprClientQuery(t, "select count(status) from stats group by status")
+ client := &MaprClient{
+ baseClient: baseClient{
+ Args: config.Args{Mode: omode.MapClient},
+ },
+ session: maprclient.NewSessionState(query),
+ mode: DefaultMode,
+ }
+
+ if err := client.commitSessionSpec(SessionSpec{}, 2); err == nil {
+ t.Fatalf("expected commitSessionSpec() to reject empty query")
+ }
+}
+
+func TestMaprClientReportDelayUsesRampUpAndSteadyIntervals(t *testing.T) {
+ query := mustMaprClientQuery(t, "select count(status) from stats group by status interval 8")
+ client := &MaprClient{}
+
+ if delay := client.reportDelay(query, true); delay != 4*time.Second {
+ t.Fatalf("ramp-up delay = %v, want 4s", delay)
+ }
+ if delay := client.reportDelay(query, false); delay != 8*time.Second {
+ t.Fatalf("steady delay = %v, want 8s", delay)
+ }
+}
+
+func mustMaprClientQuery(t *testing.T, queryStr string) *mapr.Query {
+ t.Helper()
+
+ query, err := mapr.NewQuery(queryStr)
+ if err != nil {
+ t.Fatalf("NewQuery(%q) error = %v", queryStr, err)
+ }
+ return query
+}