diff options
Diffstat (limited to 'internal/mapr/client/aggregate.go')
| -rw-r--r-- | internal/mapr/client/aggregate.go | 100 |
1 files changed, 100 insertions, 0 deletions
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go new file mode 100644 index 0000000..3f2b7a5 --- /dev/null +++ b/internal/mapr/client/aggregate.go @@ -0,0 +1,100 @@ +package client + +import ( + "github.com/mimecast/dtail/internal/logger" + "github.com/mimecast/dtail/internal/mapr" + "strconv" + "strings" +) + +// Aggregate mapreduce data on the DTail client side. +type Aggregate struct { + // This is the mapr query specified on the command line. + query *mapr.Query + // This represents aggregated data of a single remote server. + group *mapr.GroupSet + // This represents the merged aggregated data of all servers. + globalGroup *mapr.GlobalGroupSet + stop chan struct{} + // The server we aggregate the data for (logging and debugging purposes only) + server string +} + +// NewAggregate create new client aggregator. +func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *Aggregate { + return &Aggregate{ + query: query, + group: mapr.NewGroupSet(), + globalGroup: globalGroup, + stop: make(chan struct{}), + server: server, + } +} + +// Aggregate data from mapr log line into local (and global) group sets. +func (a *Aggregate) Aggregate(parts []string) { + select { + case <-a.stop: + logger.Error("Client aggregator stopped for server, not processing new data", a.server) + return + default: + } + + groupKey := parts[0] + samples, err := strconv.Atoi(parts[1]) + if err != nil { + logger.FatalExit(parts, err) + } + fields := a.makeFields(parts[2:]) + set := a.group.GetSet(groupKey) + + var addedSamples bool + for _, sc := range a.query.Select { + if val, ok := fields[sc.FieldStorage]; ok { + if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil { + logger.Error(err) + continue + } + addedSamples = true + } + } + if addedSamples { + set.Samples += samples + } + + // Merge data from group into global group. + isMerged, err := a.globalGroup.MergeNoblock(a.query, a.group) + if err != nil { + panic(err) + } + if isMerged { + // Re-init local group (make it empty again). + a.group.InitSet() + } +} + +// Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"]. +func (a *Aggregate) makeFields(parts []string) map[string]string { + fields := make(map[string]string, len(parts)) + + for _, part := range parts { + kv := strings.Split(part, "=") + if len(kv) != 2 { + continue + } + fields[kv[0]] = kv[1] + } + + return fields +} + +// Stop the client side mapreduce aggregator. +func (a *Aggregate) Stop() { + logger.Debug("Stopping client mapreduce aggregator") + close(a.stop) + + err := a.globalGroup.Merge(a.query, a.group) + if err != nil { + panic(err) + } +} |
