diff options
Diffstat (limited to 'internal/clients/handlers/maprhandler.go')
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go new file mode 100644 index 0000000..d76cdfd --- /dev/null +++ b/internal/clients/handlers/maprhandler.go @@ -0,0 +1,74 @@ +package handlers + +import ( + "github.com/mimecast/dtail/internal/logger" + "github.com/mimecast/dtail/internal/mapr" + "github.com/mimecast/dtail/internal/mapr/client" + "strings" +) + +// MaprHandler is the handler used on the client side for running mapreduce aggregations. +type MaprHandler struct { + baseHandler + aggregate *client.Aggregate + query *mapr.Query + count uint64 +} + +// NewMaprHandler returns a new mapreduce client handler. +func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet, pingTimeout int) *MaprHandler { + return &MaprHandler{ + baseHandler: baseHandler{ + server: server, + shellStarted: false, + commands: make(chan string), + pong: make(chan struct{}, 1), + stop: make(chan struct{}), + pingTimeout: pingTimeout, + }, + query: query, + aggregate: client.NewAggregate(server, query, globalGroup), + } +} + +// Read data from the dtail server via Writer interface. +func (h *MaprHandler) Write(p []byte) (n int, err error) { + for _, b := range p { + h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) + if b == '\n' { + if len(h.baseHandler.receiveBuf) == 0 { + continue + } + message := string(h.baseHandler.receiveBuf) + + if h.baseHandler.receiveBuf[0] == 'A' { + h.handleAggregateMessage(strings.TrimSpace(message)) + h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0] + continue + } + h.baseHandler.handleMessageType(message) + } + } + + return len(p), nil +} + +// Handle a message received from server including mapr aggregation +// related data. +func (h *MaprHandler) handleAggregateMessage(message string) { + h.count++ + parts := strings.Split(message, "|") + + // Index 0 contains 'AGGREGATE', 1 contains server host. + // Aggregation data begins from index 2. + logger.Debug("Received aggregate data", h.server, h.count) + h.aggregate.Aggregate(parts[2:]) + logger.Debug("Aggregated aggregate data", h.server, h.count) +} + +// Stop stops the mapreduce client handler. +func (h *MaprHandler) Stop() { + logger.Debug("Stopping mapreduce handler", h.server) + h.aggregate.Stop() + h.baseHandler.Stop() +} |
