diff options
Diffstat (limited to 'mapr/aggregateset.go')
| -rw-r--r-- | mapr/aggregateset.go | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/mapr/aggregateset.go b/mapr/aggregateset.go new file mode 100644 index 0000000..2096c3c --- /dev/null +++ b/mapr/aggregateset.go @@ -0,0 +1,185 @@ +package mapr + +import ( + "fmt" + "strconv" + "strings" +) + +// AggregateSet represents aggregated key/value pairs from the +// MAPREDUCE log lines. These could be either string values or float +// values. +type AggregateSet struct { + Samples int + FValues map[string]float64 + SValues map[string]string +} + +// NewAggregateSet creates a new empty aggregate set. +func NewAggregateSet() *AggregateSet { + return &AggregateSet{ + FValues: make(map[string]float64), + SValues: make(map[string]string), + } +} + +// String representation of aggregate set. +func (s *AggregateSet) String() string { + return fmt.Sprintf("AggregateSet(Samples:%d,FValues:%v,SValues:%v)", + s.Samples, s.FValues, s.SValues) +} + +// Merge one aggregate set into this one. +func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { + s.Samples += set.Samples + //logger.Trace("Merge", set) + + for _, sc := range query.Select { + storage := sc.FieldStorage + switch sc.Operation { + case Count: + fallthrough + case Sum: + fallthrough + case Avg: + value := set.FValues[storage] + s.addFloat(storage, value) + case Min: + value := set.FValues[storage] + s.addFloatMin(storage, value) + case Max: + value := set.FValues[storage] + s.addFloatMax(storage, value) + case Last: + value := set.SValues[storage] + s.setString(storage, value) + case Len: + s.setString(storage, set.SValues[storage]) + s.setFloat(storage, set.FValues[storage]) + default: + return fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) + } + } + return nil +} + +// Serialize the aggregate set so it can be sent over the wire. +func (s *AggregateSet) Serialize(groupKey string, ch chan<- string, stop chan struct{}) { + //logger.Trace("Serialising mapr.AggregateSet", s) + var sb strings.Builder + + sb.WriteString(groupKey) + sb.WriteString("|") + sb.WriteString(fmt.Sprintf("%d|", s.Samples)) + + for k, v := range s.FValues { + sb.WriteString(k) + sb.WriteString("=") + sb.WriteString(fmt.Sprintf("%v|", v)) + } + + for k, v := range s.SValues { + sb.WriteString(k) + sb.WriteString("=") + sb.WriteString(v) + sb.WriteString("|") + } + + select { + case ch <- sb.String(): + case <-stop: + } +} + +// Add a float value. +func (s *AggregateSet) addFloat(key string, value float64) { + if _, ok := s.FValues[key]; !ok { + s.FValues[key] = value + return + } + s.FValues[key] += value +} + +// Add a float minimum value. +func (s *AggregateSet) addFloatMin(key string, value float64) { + f, ok := s.FValues[key] + if !ok { + s.FValues[key] = value + return + } + + if f > value { + s.FValues[key] = value + } +} + +// Add a float maximum value. +func (s *AggregateSet) addFloatMax(key string, value float64) { + f, ok := s.FValues[key] + if !ok { + s.FValues[key] = value + return + } + + if f < value { + s.FValues[key] = value + } +} + +// Set a string. +func (s *AggregateSet) setString(key, value string) { + s.SValues[key] = value +} + +// Set a float. +func (s *AggregateSet) setFloat(key string, value float64) { + s.FValues[key] = value +} + +// Aggregate data to the aggregate set. +func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) (err error) { + var f float64 + + // First check if we can aggregate anything without converting value to float. + switch agg { + case Count: + if clientAggregation { + f, err = strconv.ParseFloat(value, 64) + if err != nil { + return + } + s.addFloat(key, f) + return + } + s.addFloat(key, 1) + return + case Last: + s.setString(key, value) + return + case Len: + s.setString(key, value) + s.setFloat(key, float64(len(value))) + return + default: + } + + // No, we have to convert to float. + f, err = strconv.ParseFloat(value, 64) + if err != nil { + return + } + + switch agg { + case Sum: + fallthrough + case Avg: + s.addFloat(key, f) + case Min: + s.addFloatMin(key, f) + case Max: + s.addFloatMax(key, f) + default: + err = fmt.Errorf("Unknown aggregation method '%v'", agg) + } + return +} |
