summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-03 11:20:42 +0200
committerPaul Buetow <paul@buetow.org>2026-03-03 11:20:42 +0200
commit8762a54a74da48b304e49e23bd2b2eeb8f541f67 (patch)
treebba472803a19e370fcf24fe7d7c7f7b6c343f323
parent77661ef1c646a5ef8e6964afa5571c756d1fd31d (diff)
Add percentage and percentile mapr aggregators
-rw-r--r--doc/querylanguage.md2
-rw-r--r--internal/mapr/aggregateset.go8
-rw-r--r--internal/mapr/groupset.go58
-rw-r--r--internal/mapr/groupset_percentage_test.go73
-rw-r--r--internal/mapr/query_test.go36
-rw-r--r--internal/mapr/selectcondition.go6
6 files changed, 180 insertions, 3 deletions
diff --git a/doc/querylanguage.md b/doc/querylanguage.md
index c3e567e..fab387b 100644
--- a/doc/querylanguage.md
+++ b/doc/querylanguage.md
@@ -52,7 +52,7 @@ STRINGOPERATOR := eq|ne|contains|ncontains|lacks|hasprefix|nhasprefix|hassuffix|
ORDERFIELD := FIELD|AGGREGATION(FIELD)
SET := $VARIABLE = FLOAT|STRING|FIELD|FUNCTION(FIELD)
LOGFORMAT := default|generic|generickv|...
-AGGREGATION := count|sum|min|max|avg|last|len
+AGGREGATION := count|sum|min|max|avg|last|len|percentage|percentile
FUNCTION := md5sum|maskdigits
```
diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go
index c50c7a1..263ef35 100644
--- a/internal/mapr/aggregateset.go
+++ b/internal/mapr/aggregateset.go
@@ -46,6 +46,10 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
case Sum:
fallthrough
case Avg:
+ fallthrough
+ case Percentage:
+ fallthrough
+ case Percentile:
value := set.FValues[storage]
s.addFloat(storage, value)
case Min:
@@ -177,6 +181,10 @@ func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value strin
case Sum:
fallthrough
case Avg:
+ fallthrough
+ case Percentage:
+ fallthrough
+ case Percentile:
s.addFloat(key, f)
case Min:
s.addFloatMin(key, f)
diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go
index 9d7661a..144f2cb 100644
--- a/internal/mapr/groupset.go
+++ b/internal/mapr/groupset.go
@@ -24,6 +24,11 @@ type result struct {
orderBy float64
}
+type resultStats struct {
+ percentageTotals map[string]float64
+ percentileValues map[string][]float64
+}
+
// NewGroupSet returns a new empty group set.
func NewGroupSet() *GroupSet {
g := GroupSet{}
@@ -67,12 +72,13 @@ func (g *GroupSet) result(query *Query, gathercolumnWidths bool) ([]result, []in
// not a CSV file).
columnWidths := make([]int, len(query.Select))
var valueStrLen int
+ stats := g.makeResultStats(query)
for groupKey, set := range g.sets {
result := result{groupKey: groupKey}
for i, sc := range query.Select {
- if valueStrLen, err = g.resultSelect(query, &sc, set, &result); err != nil {
+ if valueStrLen, err = g.resultSelect(query, &sc, set, &result, &stats); err != nil {
return rows, columnWidths, err
}
@@ -96,7 +102,7 @@ func (g *GroupSet) result(query *Query, gathercolumnWidths bool) ([]result, []in
}
func (*GroupSet) resultSelect(query *Query, sc *selectCondition, set *AggregateSet,
- result *result) (int, error) {
+ result *result, stats *resultStats) (int, error) {
var valueStr string
var value float64
@@ -120,6 +126,18 @@ func (*GroupSet) resultSelect(query *Query, sc *selectCondition, set *AggregateS
case Avg:
value = set.FValues[sc.FieldStorage] / float64(set.Samples)
valueStr = fmt.Sprintf("%f", value)
+ case Percentage:
+ value = set.FValues[sc.FieldStorage]
+ total := stats.percentageTotals[sc.FieldStorage]
+ if total == 0 {
+ value = 0
+ } else {
+ value = (value / total) * 100
+ }
+ valueStr = fmt.Sprintf("%f", value)
+ case Percentile:
+ value = percentileRank(set.FValues[sc.FieldStorage], stats.percentileValues[sc.FieldStorage])
+ valueStr = fmt.Sprintf("%f", value)
default:
return 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation)
}
@@ -132,6 +150,42 @@ func (*GroupSet) resultSelect(query *Query, sc *selectCondition, set *AggregateS
return len(valueStr), nil
}
+func (g *GroupSet) makeResultStats(query *Query) resultStats {
+ stats := resultStats{
+ percentageTotals: make(map[string]float64),
+ percentileValues: make(map[string][]float64),
+ }
+
+ for _, set := range g.sets {
+ for _, sc := range query.Select {
+ value := set.FValues[sc.FieldStorage]
+ switch sc.Operation {
+ case Percentage:
+ stats.percentageTotals[sc.FieldStorage] += value
+ case Percentile:
+ stats.percentileValues[sc.FieldStorage] = append(stats.percentileValues[sc.FieldStorage], value)
+ }
+ }
+ }
+
+ for storage := range stats.percentileValues {
+ sort.Float64s(stats.percentileValues[storage])
+ }
+
+ return stats
+}
+
+func percentileRank(value float64, sortedValues []float64) float64 {
+ if len(sortedValues) == 0 {
+ return 0
+ }
+
+ upperBound := sort.Search(len(sortedValues), func(i int) bool {
+ return sortedValues[i] > value
+ })
+ return (float64(upperBound) / float64(len(sortedValues))) * 100
+}
+
func (*GroupSet) resultOrderBy(query *Query, rows []result) {
if query.OrderBy == "" {
return
diff --git a/internal/mapr/groupset_percentage_test.go b/internal/mapr/groupset_percentage_test.go
new file mode 100644
index 0000000..94a9fc8
--- /dev/null
+++ b/internal/mapr/groupset_percentage_test.go
@@ -0,0 +1,73 @@
+package mapr
+
+import (
+ "strconv"
+ "testing"
+)
+
+func TestGroupSetResultPercentageAndPercentile(t *testing.T) {
+ query, err := NewQuery("select percentage(value),percentile(value) from stats group by host order by percentage(value)")
+ if err != nil {
+ t.Fatalf("Unable to parse query: %v", err)
+ }
+
+ groupSet := NewGroupSet()
+
+ setA := groupSet.GetSet("host-a")
+ if err := setA.Aggregate("percentage(value)", Percentage, "10", false); err != nil {
+ t.Fatalf("Unable to aggregate percentage for host-a: %v", err)
+ }
+ if err := setA.Aggregate("percentile(value)", Percentile, "10", false); err != nil {
+ t.Fatalf("Unable to aggregate percentile for host-a: %v", err)
+ }
+
+ setB := groupSet.GetSet("host-b")
+ if err := setB.Aggregate("percentage(value)", Percentage, "30", false); err != nil {
+ t.Fatalf("Unable to aggregate percentage for host-b: %v", err)
+ }
+ if err := setB.Aggregate("percentile(value)", Percentile, "30", false); err != nil {
+ t.Fatalf("Unable to aggregate percentile for host-b: %v", err)
+ }
+
+ rows, _, err := groupSet.result(query, false)
+ if err != nil {
+ t.Fatalf("Unable to build result rows: %v", err)
+ }
+ if len(rows) != 2 {
+ t.Fatalf("Expected 2 result rows, got %d", len(rows))
+ }
+
+ if rows[0].groupKey != "host-b" {
+ t.Fatalf("Expected rows to be ordered by percentage descending, first row=%s", rows[0].groupKey)
+ }
+
+ valuesByGroup := map[string][]float64{}
+ for _, row := range rows {
+ parsedValues := make([]float64, 0, len(row.values))
+ for _, value := range row.values {
+ parsedValue, err := strconv.ParseFloat(value, 64)
+ if err != nil {
+ t.Fatalf("Unable to parse result value %q: %v", value, err)
+ }
+ parsedValues = append(parsedValues, parsedValue)
+ }
+ valuesByGroup[row.groupKey] = parsedValues
+ }
+
+ assertAlmostEqual(t, valuesByGroup["host-a"][0], 25.0, 0.0001, "host-a percentage")
+ assertAlmostEqual(t, valuesByGroup["host-a"][1], 50.0, 0.0001, "host-a percentile")
+ assertAlmostEqual(t, valuesByGroup["host-b"][0], 75.0, 0.0001, "host-b percentage")
+ assertAlmostEqual(t, valuesByGroup["host-b"][1], 100.0, 0.0001, "host-b percentile")
+}
+
+func assertAlmostEqual(t *testing.T, got, expected, tolerance float64, label string) {
+ t.Helper()
+
+ diff := got - expected
+ if diff < 0 {
+ diff = -diff
+ }
+ if diff > tolerance {
+ t.Fatalf("Unexpected %s: got=%f expected=%f tolerance=%f", label, got, expected, tolerance)
+ }
+}
diff --git a/internal/mapr/query_test.go b/internal/mapr/query_test.go
index f37b8d4..4207fe7 100644
--- a/internal/mapr/query_test.go
+++ b/internal/mapr/query_test.go
@@ -292,3 +292,39 @@ func TestQuotedSelectCondition(t *testing.T) {
"'select' clause but got '%v': %s\n%v", q.Select[3].Operation, queryStr, q)
}
}
+
+func TestParseQueryPercentageAndPercentile(t *testing.T) {
+ queryStr := "select percentage($value),percentile($value) from stats group by $host order by percentile($value)"
+
+ q, err := NewQuery(queryStr)
+ if err != nil {
+ t.Errorf("Query parse error: %s\n%v: %v", queryStr, q, err)
+ }
+ if len(q.Select) != 2 {
+ t.Fatalf("Expected two elements in 'select' clause but got '%v': %s\n%v",
+ q.Select, queryStr, q)
+ }
+
+ if q.Select[0].Operation != Percentage {
+ t.Errorf("Expected 'percentage' as first select aggregation but got '%v': %s\n%v",
+ q.Select[0].Operation, queryStr, q)
+ }
+ if q.Select[0].FieldStorage != "percentage($value)" {
+ t.Errorf("Expected percentage field storage but got '%v': %s\n%v",
+ q.Select[0].FieldStorage, queryStr, q)
+ }
+
+ if q.Select[1].Operation != Percentile {
+ t.Errorf("Expected 'percentile' as second select aggregation but got '%v': %s\n%v",
+ q.Select[1].Operation, queryStr, q)
+ }
+ if q.Select[1].FieldStorage != "percentile($value)" {
+ t.Errorf("Expected percentile field storage but got '%v': %s\n%v",
+ q.Select[1].FieldStorage, queryStr, q)
+ }
+
+ if q.OrderBy != "percentile($value)" {
+ t.Errorf("Expected order by percentile($value) but got '%v': %s\n%v",
+ q.OrderBy, queryStr, q)
+ }
+}
diff --git a/internal/mapr/selectcondition.go b/internal/mapr/selectcondition.go
index 78359c7..3ea4355 100644
--- a/internal/mapr/selectcondition.go
+++ b/internal/mapr/selectcondition.go
@@ -19,6 +19,8 @@ const (
Last AggregateOperation = iota
Avg AggregateOperation = iota
Len AggregateOperation = iota
+ Percentage AggregateOperation = iota
+ Percentile AggregateOperation = iota
)
// Represents a parsed "select" clause, used by mapr.Query.
@@ -81,6 +83,10 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) {
sc.Operation = Avg
case "len":
sc.Operation = Len
+ case "percentage":
+ sc.Operation = Percentage
+ case "percentile":
+ sc.Operation = Percentile
default:
return sc, errors.New(invalidQuery + "Unknown aggregation in 'select' clause: " + agg)
}