diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-03 11:20:42 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-03 11:20:42 +0200 |
| commit | 8762a54a74da48b304e49e23bd2b2eeb8f541f67 (patch) | |
| tree | bba472803a19e370fcf24fe7d7c7f7b6c343f323 | |
| parent | 77661ef1c646a5ef8e6964afa5571c756d1fd31d (diff) | |
Add percentage and percentile mapr aggregators
| -rw-r--r-- | doc/querylanguage.md | 2 | ||||
| -rw-r--r-- | internal/mapr/aggregateset.go | 8 | ||||
| -rw-r--r-- | internal/mapr/groupset.go | 58 | ||||
| -rw-r--r-- | internal/mapr/groupset_percentage_test.go | 73 | ||||
| -rw-r--r-- | internal/mapr/query_test.go | 36 | ||||
| -rw-r--r-- | internal/mapr/selectcondition.go | 6 |
6 files changed, 180 insertions, 3 deletions
diff --git a/doc/querylanguage.md b/doc/querylanguage.md index c3e567e..fab387b 100644 --- a/doc/querylanguage.md +++ b/doc/querylanguage.md @@ -52,7 +52,7 @@ STRINGOPERATOR := eq|ne|contains|ncontains|lacks|hasprefix|nhasprefix|hassuffix| ORDERFIELD := FIELD|AGGREGATION(FIELD) SET := $VARIABLE = FLOAT|STRING|FIELD|FUNCTION(FIELD) LOGFORMAT := default|generic|generickv|... -AGGREGATION := count|sum|min|max|avg|last|len +AGGREGATION := count|sum|min|max|avg|last|len|percentage|percentile FUNCTION := md5sum|maskdigits ``` diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index c50c7a1..263ef35 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -46,6 +46,10 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { case Sum: fallthrough case Avg: + fallthrough + case Percentage: + fallthrough + case Percentile: value := set.FValues[storage] s.addFloat(storage, value) case Min: @@ -177,6 +181,10 @@ func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value strin case Sum: fallthrough case Avg: + fallthrough + case Percentage: + fallthrough + case Percentile: s.addFloat(key, f) case Min: s.addFloatMin(key, f) diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go index 9d7661a..144f2cb 100644 --- a/internal/mapr/groupset.go +++ b/internal/mapr/groupset.go @@ -24,6 +24,11 @@ type result struct { orderBy float64 } +type resultStats struct { + percentageTotals map[string]float64 + percentileValues map[string][]float64 +} + // NewGroupSet returns a new empty group set. func NewGroupSet() *GroupSet { g := GroupSet{} @@ -67,12 +72,13 @@ func (g *GroupSet) result(query *Query, gathercolumnWidths bool) ([]result, []in // not a CSV file). columnWidths := make([]int, len(query.Select)) var valueStrLen int + stats := g.makeResultStats(query) for groupKey, set := range g.sets { result := result{groupKey: groupKey} for i, sc := range query.Select { - if valueStrLen, err = g.resultSelect(query, &sc, set, &result); err != nil { + if valueStrLen, err = g.resultSelect(query, &sc, set, &result, &stats); err != nil { return rows, columnWidths, err } @@ -96,7 +102,7 @@ func (g *GroupSet) result(query *Query, gathercolumnWidths bool) ([]result, []in } func (*GroupSet) resultSelect(query *Query, sc *selectCondition, set *AggregateSet, - result *result) (int, error) { + result *result, stats *resultStats) (int, error) { var valueStr string var value float64 @@ -120,6 +126,18 @@ func (*GroupSet) resultSelect(query *Query, sc *selectCondition, set *AggregateS case Avg: value = set.FValues[sc.FieldStorage] / float64(set.Samples) valueStr = fmt.Sprintf("%f", value) + case Percentage: + value = set.FValues[sc.FieldStorage] + total := stats.percentageTotals[sc.FieldStorage] + if total == 0 { + value = 0 + } else { + value = (value / total) * 100 + } + valueStr = fmt.Sprintf("%f", value) + case Percentile: + value = percentileRank(set.FValues[sc.FieldStorage], stats.percentileValues[sc.FieldStorage]) + valueStr = fmt.Sprintf("%f", value) default: return 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) } @@ -132,6 +150,42 @@ func (*GroupSet) resultSelect(query *Query, sc *selectCondition, set *AggregateS return len(valueStr), nil } +func (g *GroupSet) makeResultStats(query *Query) resultStats { + stats := resultStats{ + percentageTotals: make(map[string]float64), + percentileValues: make(map[string][]float64), + } + + for _, set := range g.sets { + for _, sc := range query.Select { + value := set.FValues[sc.FieldStorage] + switch sc.Operation { + case Percentage: + stats.percentageTotals[sc.FieldStorage] += value + case Percentile: + stats.percentileValues[sc.FieldStorage] = append(stats.percentileValues[sc.FieldStorage], value) + } + } + } + + for storage := range stats.percentileValues { + sort.Float64s(stats.percentileValues[storage]) + } + + return stats +} + +func percentileRank(value float64, sortedValues []float64) float64 { + if len(sortedValues) == 0 { + return 0 + } + + upperBound := sort.Search(len(sortedValues), func(i int) bool { + return sortedValues[i] > value + }) + return (float64(upperBound) / float64(len(sortedValues))) * 100 +} + func (*GroupSet) resultOrderBy(query *Query, rows []result) { if query.OrderBy == "" { return diff --git a/internal/mapr/groupset_percentage_test.go b/internal/mapr/groupset_percentage_test.go new file mode 100644 index 0000000..94a9fc8 --- /dev/null +++ b/internal/mapr/groupset_percentage_test.go @@ -0,0 +1,73 @@ +package mapr + +import ( + "strconv" + "testing" +) + +func TestGroupSetResultPercentageAndPercentile(t *testing.T) { + query, err := NewQuery("select percentage(value),percentile(value) from stats group by host order by percentage(value)") + if err != nil { + t.Fatalf("Unable to parse query: %v", err) + } + + groupSet := NewGroupSet() + + setA := groupSet.GetSet("host-a") + if err := setA.Aggregate("percentage(value)", Percentage, "10", false); err != nil { + t.Fatalf("Unable to aggregate percentage for host-a: %v", err) + } + if err := setA.Aggregate("percentile(value)", Percentile, "10", false); err != nil { + t.Fatalf("Unable to aggregate percentile for host-a: %v", err) + } + + setB := groupSet.GetSet("host-b") + if err := setB.Aggregate("percentage(value)", Percentage, "30", false); err != nil { + t.Fatalf("Unable to aggregate percentage for host-b: %v", err) + } + if err := setB.Aggregate("percentile(value)", Percentile, "30", false); err != nil { + t.Fatalf("Unable to aggregate percentile for host-b: %v", err) + } + + rows, _, err := groupSet.result(query, false) + if err != nil { + t.Fatalf("Unable to build result rows: %v", err) + } + if len(rows) != 2 { + t.Fatalf("Expected 2 result rows, got %d", len(rows)) + } + + if rows[0].groupKey != "host-b" { + t.Fatalf("Expected rows to be ordered by percentage descending, first row=%s", rows[0].groupKey) + } + + valuesByGroup := map[string][]float64{} + for _, row := range rows { + parsedValues := make([]float64, 0, len(row.values)) + for _, value := range row.values { + parsedValue, err := strconv.ParseFloat(value, 64) + if err != nil { + t.Fatalf("Unable to parse result value %q: %v", value, err) + } + parsedValues = append(parsedValues, parsedValue) + } + valuesByGroup[row.groupKey] = parsedValues + } + + assertAlmostEqual(t, valuesByGroup["host-a"][0], 25.0, 0.0001, "host-a percentage") + assertAlmostEqual(t, valuesByGroup["host-a"][1], 50.0, 0.0001, "host-a percentile") + assertAlmostEqual(t, valuesByGroup["host-b"][0], 75.0, 0.0001, "host-b percentage") + assertAlmostEqual(t, valuesByGroup["host-b"][1], 100.0, 0.0001, "host-b percentile") +} + +func assertAlmostEqual(t *testing.T, got, expected, tolerance float64, label string) { + t.Helper() + + diff := got - expected + if diff < 0 { + diff = -diff + } + if diff > tolerance { + t.Fatalf("Unexpected %s: got=%f expected=%f tolerance=%f", label, got, expected, tolerance) + } +} diff --git a/internal/mapr/query_test.go b/internal/mapr/query_test.go index f37b8d4..4207fe7 100644 --- a/internal/mapr/query_test.go +++ b/internal/mapr/query_test.go @@ -292,3 +292,39 @@ func TestQuotedSelectCondition(t *testing.T) { "'select' clause but got '%v': %s\n%v", q.Select[3].Operation, queryStr, q) } } + +func TestParseQueryPercentageAndPercentile(t *testing.T) { + queryStr := "select percentage($value),percentile($value) from stats group by $host order by percentile($value)" + + q, err := NewQuery(queryStr) + if err != nil { + t.Errorf("Query parse error: %s\n%v: %v", queryStr, q, err) + } + if len(q.Select) != 2 { + t.Fatalf("Expected two elements in 'select' clause but got '%v': %s\n%v", + q.Select, queryStr, q) + } + + if q.Select[0].Operation != Percentage { + t.Errorf("Expected 'percentage' as first select aggregation but got '%v': %s\n%v", + q.Select[0].Operation, queryStr, q) + } + if q.Select[0].FieldStorage != "percentage($value)" { + t.Errorf("Expected percentage field storage but got '%v': %s\n%v", + q.Select[0].FieldStorage, queryStr, q) + } + + if q.Select[1].Operation != Percentile { + t.Errorf("Expected 'percentile' as second select aggregation but got '%v': %s\n%v", + q.Select[1].Operation, queryStr, q) + } + if q.Select[1].FieldStorage != "percentile($value)" { + t.Errorf("Expected percentile field storage but got '%v': %s\n%v", + q.Select[1].FieldStorage, queryStr, q) + } + + if q.OrderBy != "percentile($value)" { + t.Errorf("Expected order by percentile($value) but got '%v': %s\n%v", + q.OrderBy, queryStr, q) + } +} diff --git a/internal/mapr/selectcondition.go b/internal/mapr/selectcondition.go index 78359c7..3ea4355 100644 --- a/internal/mapr/selectcondition.go +++ b/internal/mapr/selectcondition.go @@ -19,6 +19,8 @@ const ( Last AggregateOperation = iota Avg AggregateOperation = iota Len AggregateOperation = iota + Percentage AggregateOperation = iota + Percentile AggregateOperation = iota ) // Represents a parsed "select" clause, used by mapr.Query. @@ -81,6 +83,10 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { sc.Operation = Avg case "len": sc.Operation = Len + case "percentage": + sc.Operation = Percentage + case "percentile": + sc.Operation = Percentile default: return sc, errors.New(invalidQuery + "Unknown aggregation in 'select' clause: " + agg) } |
