summaryrefslogtreecommitdiff
path: root/internal/mapr/client/aggregate_test.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 09:29:59 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 09:29:59 +0200
commitd8f88d455990636bb797643dee7d39a95bbbd62c (patch)
tree8c8447fc975ec6deebe48218d27e3defa1b3dcce /internal/mapr/client/aggregate_test.go
parent7a79d0a8bf58b05dfbae331d00275739530b9584 (diff)
task 4abe7505: reset dmap generation state
Diffstat (limited to 'internal/mapr/client/aggregate_test.go')
-rw-r--r--internal/mapr/client/aggregate_test.go70
1 files changed, 70 insertions, 0 deletions
diff --git a/internal/mapr/client/aggregate_test.go b/internal/mapr/client/aggregate_test.go
new file mode 100644
index 0000000..8ac94a1
--- /dev/null
+++ b/internal/mapr/client/aggregate_test.go
@@ -0,0 +1,70 @@
+package client
+
+import (
+ "strings"
+ "testing"
+
+ "github.com/mimecast/dtail/internal/mapr"
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+func TestAggregateResetsPendingLocalStateOnGenerationChange(t *testing.T) {
+ query := mustSessionStateQuery(t, "select status,count(status) from stats group by status")
+ state := NewSessionState(query)
+ aggregate := NewAggregate("srv1", state)
+ countStorage := aggregateCountStorage(t, query)
+
+ oldSet := aggregate.group.GetSet("ERROR")
+ oldSet.Samples = 1
+ oldSet.FValues[countStorage] = 1
+
+ rawQuery := "select status,count(status) from warnings group by status"
+ if _, err := state.CommitQuery(rawQuery, 2); err != nil {
+ t.Fatalf("CommitQuery() error = %v", err)
+ }
+
+ snapshot := state.Snapshot()
+ message := strings.Join([]string{
+ "WARN",
+ "1",
+ aggregateCountStorage(t, snapshot.Query) + protocol.AggregateKVDelimiter + "1",
+ "",
+ }, protocol.AggregateDelimiter)
+
+ if err := aggregate.Aggregate(message); err != nil {
+ t.Fatalf("Aggregate() error = %v", err)
+ }
+
+ result, numRows, err := snapshot.GlobalGroup.Result(snapshot.Query, 10, nil)
+ if err != nil {
+ t.Fatalf("Result() error = %v", err)
+ }
+ if numRows != 1 {
+ t.Fatalf("numRows = %d, want 1", numRows)
+ }
+ if !strings.Contains(result, "1") {
+ t.Fatalf("expected one new-generation aggregate row, got %q", result)
+ }
+}
+
+func TestAggregateRejectsMalformedMessage(t *testing.T) {
+ query := mustSessionStateQuery(t, "select count(status) from stats group by status")
+ state := NewSessionState(query)
+ aggregate := NewAggregate("srv1", state)
+
+ if err := aggregate.Aggregate("broken"); err == nil {
+ t.Fatalf("expected Aggregate() to reject malformed messages")
+ }
+}
+
+func aggregateCountStorage(t *testing.T, query *mapr.Query) string {
+ t.Helper()
+
+ for _, selectCondition := range query.Select {
+ if selectCondition.Operation == mapr.Count {
+ return selectCondition.FieldStorage
+ }
+ }
+ t.Fatalf("query %q does not contain count() storage", query.RawQuery)
+ return ""
+}