diff options
Diffstat (limited to 'internal/clients')
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 8 | ||||
| -rw-r--r-- | internal/clients/maker.go | 4 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 209 | ||||
| -rw-r--r-- | internal/clients/maprclient_test.go | 104 |
4 files changed, 239 insertions, 86 deletions
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 5a16d13..d4e171c 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -5,7 +5,6 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/dlog" - "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" "github.com/mimecast/dtail/internal/protocol" ) @@ -15,13 +14,11 @@ import ( type MaprHandler struct { baseHandler aggregate *client.Aggregate - query *mapr.Query removedNl bool } // NewMaprHandler returns a new mapreduce client handler. -func NewMaprHandler(server string, query *mapr.Query, - globalGroup *mapr.GlobalGroupSet) *MaprHandler { +func NewMaprHandler(server string, session *client.SessionState) *MaprHandler { return &MaprHandler{ baseHandler: baseHandler{ @@ -34,8 +31,7 @@ func NewMaprHandler(server string, query *mapr.Query, capabilitiesCh: make(chan struct{}), sessionAcks: make(chan SessionAck, 4), }, - query: query, - aggregate: client.NewAggregate(server, query, globalGroup), + aggregate: client.NewAggregate(server, session), } } diff --git a/internal/clients/maker.go b/internal/clients/maker.go index 6d75fc1..81ced09 100644 --- a/internal/clients/maker.go +++ b/internal/clients/maker.go @@ -15,3 +15,7 @@ type maker interface { type sessionSpecMaker interface { makeSessionSpec() (SessionSpec, error) } + +type sessionCommitter interface { + commitSessionSpec(spec SessionSpec, generation uint64) error +} diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 4bf0cf0..68770ec 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -11,6 +11,7 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" + maprclient "github.com/mimecast/dtail/internal/mapr/client" "github.com/mimecast/dtail/internal/omode" ) @@ -29,14 +30,10 @@ const ( // MaprClient is used for running mapreduce aggregations on remote files. type MaprClient struct { baseClient - // Global group set for merged mapr aggregation results - globalGroup *mapr.GlobalGroupSet - // The query object (constructed from queryStr) - query *mapr.Query - // Additative result or new result every interval run? - cumulative bool - // The last result string received - lastResult string + // Shared mapreduce state for all handlers and reporting paths. + session *maprclient.SessionState + // Selected cumulative reporting mode. + mode MaprClientMode } // NewMaprClient returns a new mapreduce client. @@ -53,19 +50,6 @@ func NewMaprClient(args config.Args, maprClientMode MaprClientMode) (*MaprClient // Don't retry connection if in tail mode and no outfile specified. retry := args.Mode == omode.TailClient && !query.HasOutfile() - var cumulative bool - switch maprClientMode { - case CumulativeMode: - cumulative = true - case NonCumulativeMode: - cumulative = false - default: - // Result is comulative if we are in MapClient mode or with outfile - cumulative = args.Mode == omode.MapClient || query.HasOutfile() - } - - dlog.Client.Debug("Cumulative mapreduce mode?", cumulative) - c := MaprClient{ baseClient: baseClient{ Args: args, @@ -73,22 +57,14 @@ func NewMaprClient(args config.Args, maprClientMode MaprClientMode) (*MaprClient retry: retry, runtime: newClientRuntimeBoundary(config.CurrentRuntime()), }, - query: query, - cumulative: cumulative, - } - - switch c.query.Table { - case "", ".": - c.RegexStr = "." - case "*": - c.RegexStr = "\\|MAPREDUCE:\\|" - default: - c.RegexStr = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table) + session: maprclient.NewSessionState(query), + mode: maprClientMode, } + dlog.Client.Debug("Cumulative mapreduce mode?", c.isCumulative(query)) - c.globalGroup = mapr.NewGlobalGroupSet() + c.setRegexForQuery(query) c.baseClient.init() - c.baseClient.makeConnections(c) + c.baseClient.makeConnections(&c) return &c, nil } @@ -100,7 +76,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i status = c.baseClient.Start(ctx, statsCh) // Always write final result for cumulative mode (includes outfile case) - if c.cumulative { + if snapshot := c.session.Snapshot(); c.isCumulative(snapshot.Query) { dlog.Client.Debug("Writing final mapreduce result") if err := c.reportResults(true); err != nil { dlog.Client.Error("Unable to write final mapreduce result", err) @@ -113,17 +89,19 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i // NEXT: Make this a callback function rather trying to use polymorphism to call // this. This applies to all clients. It will make the code easier to read. -func (c MaprClient) makeHandler(server string) handlers.Handler { - return handlers.NewMaprHandler(server, c.query, c.globalGroup) +func (c *MaprClient) makeHandler(server string) handlers.Handler { + return handlers.NewMaprHandler(server, c.session) } -func (c MaprClient) makeSessionSpec() (SessionSpec, error) { +func (c *MaprClient) makeSessionSpec() (SessionSpec, error) { sessionSpec := NewSessionSpec(c.Args) - sessionSpec.Query = c.query.RawQuery + if snapshot := c.session.Snapshot(); snapshot.Query != nil { + sessionSpec.Query = snapshot.Query.RawQuery + } return sessionSpec, nil } -func (c MaprClient) makeCommands() (commands []string) { +func (c *MaprClient) makeCommands() (commands []string) { sessionSpec, err := c.makeSessionSpec() if err != nil { dlog.Client.FatalPanic("unable to build map session spec", err) @@ -136,84 +114,96 @@ func (c MaprClient) makeCommands() (commands []string) { } func (c *MaprClient) periodicReportResults(ctx context.Context) { - rampUpSleep := c.query.Interval / 2 - dlog.Client.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep) + var ( + lastGeneration uint64 + seenGeneration bool + ) - if rampUpSleep > 0 { - rampUpTimer := time.NewTimer(rampUpSleep) - select { - case <-rampUpTimer.C: - case <-ctx.Done(): - if !rampUpTimer.Stop() { - select { - case <-rampUpTimer.C: - default: - } - } - return - } - } + for { + snapshot := c.session.Snapshot() + rampUp := !seenGeneration || snapshot.Generation != lastGeneration + lastGeneration = snapshot.Generation + seenGeneration = true - interval := c.query.Interval - if interval <= 0 { - interval = time.Second - } - ticker := time.NewTicker(interval) - defer ticker.Stop() + delay := c.reportDelay(snapshot.Query, rampUp) + dlog.Client.Debug("Sleeping before processing mapreduce results", "generation", snapshot.Generation, "delay", delay) - for { + timer := time.NewTimer(delay) select { - case <-ticker.C: + case <-timer.C: dlog.Client.Debug("Gathering interim mapreduce result") if err := c.reportResults(false); err != nil { dlog.Client.Error("Unable to gather mapreduce result", err) } + case <-c.session.Changes(): + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + dlog.Client.Debug("Mapreduce query generation changed, recalculating report interval") case <-ctx.Done(): + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } return } } } func (c *MaprClient) reportResults(finalResult bool) error { - if c.query.HasOutfile() { - return c.writeResultsToOutfile(finalResult) + snapshot := c.session.Snapshot() + if snapshot.Query == nil || snapshot.GlobalGroup == nil { + return nil } - return c.printResults() + + if snapshot.Query.HasOutfile() { + return c.writeResultsToOutfile(snapshot, finalResult) + } + return c.printResults(snapshot) } -func (c *MaprClient) printResults() error { +func (c *MaprClient) printResults(snapshot maprclient.SessionSnapshot) error { var result string var err error var numRows int rowsLimit := -1 - if c.query.Limit == -1 { + if snapshot.Query.Limit == -1 { // Limit output to 10 rows when the result is printed to stdout. // This can be overriden with the limit clause though. rowsLimit = 10 } - if c.cumulative { - result, numRows, err = c.globalGroup.Result(c.query, rowsLimit, c.runtime.output.MaprResultRenderer()) + if c.isCumulative(snapshot.Query) { + result, numRows, err = snapshot.GlobalGroup.Result(snapshot.Query, rowsLimit, c.runtime.output.MaprResultRenderer()) } else { - result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit, c.runtime.output.MaprResultRenderer()) + result, numRows, err = snapshot.GlobalGroup.SwapOut().Result(snapshot.Query, rowsLimit, c.runtime.output.MaprResultRenderer()) } if err != nil { return fmt.Errorf("unable to render mapreduce result: %w", err) } - if result == c.lastResult { + changed, ok := c.session.CommitRenderedResult(snapshot.Generation, result) + if !ok { + dlog.Client.Debug("Discarding stale mapreduce result", "generation", snapshot.Generation) + return nil + } + if !changed { dlog.Client.Debug("Result hasn't changed compared to last time...") return nil } - c.lastResult = result if numRows == 0 { dlog.Client.Debug("Empty result set this time...") return nil } - rawQuery := c.runtime.output.PaintMaprRawQuery(c.query.RawQuery) + rawQuery := c.runtime.output.PaintMaprRawQuery(snapshot.Query.RawQuery) dlog.Client.Raw(fmt.Sprintf("%s\n", rawQuery)) if rowsLimit > 0 && numRows > rowsLimit { @@ -224,18 +214,77 @@ func (c *MaprClient) printResults() error { return nil } -func (c *MaprClient) writeResultsToOutfile(finalResult bool) error { - dlog.Client.Debug("writeResultsToOutfile called", "finalResult", finalResult, "cumulative", c.cumulative) - if c.cumulative { - if err := c.globalGroup.WriteResult(c.query, finalResult); err != nil { +func (c *MaprClient) writeResultsToOutfile(snapshot maprclient.SessionSnapshot, finalResult bool) error { + cumulative := c.isCumulative(snapshot.Query) + dlog.Client.Debug("writeResultsToOutfile called", "finalResult", finalResult, "cumulative", cumulative, "generation", snapshot.Generation) + if cumulative { + if err := snapshot.GlobalGroup.WriteResult(snapshot.Query, finalResult); err != nil { return fmt.Errorf("unable to write cumulative mapreduce result: %w", err) } dlog.Client.Debug("WriteResult completed for cumulative mode") return nil } - if err := c.globalGroup.SwapOut().WriteResult(c.query, true); err != nil { + if err := snapshot.GlobalGroup.SwapOut().WriteResult(snapshot.Query, true); err != nil { return fmt.Errorf("unable to write non-cumulative mapreduce result: %w", err) } dlog.Client.Debug("WriteResult completed for non-cumulative mode") return nil } + +func (c *MaprClient) commitSessionSpec(spec SessionSpec, generation uint64) error { + if spec.Query == "" { + return errors.New("missing mapreduce query") + } + + query, err := c.session.CommitQuery(spec.Query, generation) + if err != nil { + return err + } + + c.Args.QueryStr = spec.Query + c.setRegexForQuery(query) + return nil +} + +func (c *MaprClient) isCumulative(query *mapr.Query) bool { + switch c.mode { + case CumulativeMode: + return true + case NonCumulativeMode: + return false + default: + return c.Args.Mode == omode.MapClient || (query != nil && query.HasOutfile()) + } +} + +func (c *MaprClient) setRegexForQuery(query *mapr.Query) { + if query == nil { + c.RegexStr = "." + return + } + + switch query.Table { + case "", ".": + c.RegexStr = "." + case "*": + c.RegexStr = "\\|MAPREDUCE:\\|" + default: + c.RegexStr = fmt.Sprintf("\\|MAPREDUCE:%s\\|", query.Table) + } +} + +func (c *MaprClient) reportDelay(query *mapr.Query, rampUp bool) time.Duration { + interval := time.Second + if query != nil && query.Interval > 0 { + interval = query.Interval + } + if !rampUp { + return interval + } + + delay := interval / 2 + if delay <= 0 { + return interval + } + return delay +} diff --git a/internal/clients/maprclient_test.go b/internal/clients/maprclient_test.go new file mode 100644 index 0000000..641a74a --- /dev/null +++ b/internal/clients/maprclient_test.go @@ -0,0 +1,104 @@ +package clients + +import ( + "testing" + "time" + + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/mapr" + maprclient "github.com/mimecast/dtail/internal/mapr/client" + "github.com/mimecast/dtail/internal/omode" +) + +func TestMaprClientCommitSessionSpecResetsSharedState(t *testing.T) { + query := mustMaprClientQuery(t, "select count(status) from stats group by status") + client := &MaprClient{ + baseClient: baseClient{ + Args: config.Args{Mode: omode.MapClient}, + }, + session: maprclient.NewSessionState(query), + mode: DefaultMode, + } + client.setRegexForQuery(query) + + initial := client.session.Snapshot() + group := mapr.NewGroupSet() + set := group.GetSet("ERROR") + set.Samples = 1 + set.FValues[query.Select[0].FieldStorage] = 1 + if err := initial.GlobalGroup.Merge(query, group); err != nil { + t.Fatalf("Merge() error = %v", err) + } + if changed, ok := client.session.CommitRenderedResult(initial.Generation, "old-result"); !ok || !changed { + t.Fatalf("CommitRenderedResult() = changed:%v ok:%v, want changed and ok", changed, ok) + } + + spec := SessionSpec{ + Query: "select count(status) from warnings group by status", + } + if err := client.commitSessionSpec(spec, 4); err != nil { + t.Fatalf("commitSessionSpec() error = %v", err) + } + + updated := client.session.Snapshot() + if updated.Generation != 4 { + t.Fatalf("generation = %d, want 4", updated.Generation) + } + if updated.Query == nil || updated.Query.RawQuery != spec.Query { + t.Fatalf("unexpected query after commit: %#v", updated.Query) + } + if !updated.GlobalGroup.IsEmpty() { + t.Fatalf("expected committed global group to be reset") + } + if updated.LastResult != "" { + t.Fatalf("last result = %q, want empty", updated.LastResult) + } + if client.RegexStr != "\\|MAPREDUCE:WARNINGS\\|" { + t.Fatalf("RegexStr = %q, want WARNINGS table regex", client.RegexStr) + } + + sessionSpec, err := client.makeSessionSpec() + if err != nil { + t.Fatalf("makeSessionSpec() error = %v", err) + } + if sessionSpec.Query != spec.Query { + t.Fatalf("session spec query = %q, want %q", sessionSpec.Query, spec.Query) + } +} + +func TestMaprClientCommitSessionSpecRejectsMissingQuery(t *testing.T) { + query := mustMaprClientQuery(t, "select count(status) from stats group by status") + client := &MaprClient{ + baseClient: baseClient{ + Args: config.Args{Mode: omode.MapClient}, + }, + session: maprclient.NewSessionState(query), + mode: DefaultMode, + } + + if err := client.commitSessionSpec(SessionSpec{}, 2); err == nil { + t.Fatalf("expected commitSessionSpec() to reject empty query") + } +} + +func TestMaprClientReportDelayUsesRampUpAndSteadyIntervals(t *testing.T) { + query := mustMaprClientQuery(t, "select count(status) from stats group by status interval 8") + client := &MaprClient{} + + if delay := client.reportDelay(query, true); delay != 4*time.Second { + t.Fatalf("ramp-up delay = %v, want 4s", delay) + } + if delay := client.reportDelay(query, false); delay != 8*time.Second { + t.Fatalf("steady delay = %v, want 8s", delay) + } +} + +func mustMaprClientQuery(t *testing.T, queryStr string) *mapr.Query { + t.Helper() + + query, err := mapr.NewQuery(queryStr) + if err != nil { + t.Fatalf("NewQuery(%q) error = %v", queryStr, err) + } + return query +} |
