summaryrefslogtreecommitdiff
path: root/internal/clients/handlers/maprhandler.go
blob: b908f3b133323d18d7b4be79086d56c6f78966cf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package handlers

import (
	"strings"

	"github.com/mimecast/dtail/internal/io/logger"
	"github.com/mimecast/dtail/internal/mapr"
	"github.com/mimecast/dtail/internal/mapr/client"
)

// MaprHandler is the handler used on the client side for running mapreduce aggregations.
type MaprHandler struct {
	baseHandler
	aggregate *client.Aggregate
	query     *mapr.Query
	count     uint64
}

// NewMaprHandler returns a new mapreduce client handler.
func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler {
	return &MaprHandler{
		baseHandler: baseHandler{
			server:       server,
			shellStarted: false,
			commands:     make(chan string),
			status:       -1,
			withCancel: withCancel{
				done: make(chan struct{}),
			},
		},
		query:     query,
		aggregate: client.NewAggregate(server, query, globalGroup),
	}
}

// Read data from the dtail server via Writer interface.
func (h *MaprHandler) Write(p []byte) (n int, err error) {
	for _, b := range p {
		h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b)
		if b == '\n' {
			if len(h.baseHandler.receiveBuf) == 0 {
				continue
			}
			message := string(h.baseHandler.receiveBuf)

			if h.baseHandler.receiveBuf[0] == 'A' {
				h.handleAggregateMessage(strings.TrimSpace(message))
				h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0]
				continue
			}
			h.baseHandler.handleMessageType(message)
		}
	}

	return len(p), nil
}

// Handle a message received from server including mapr aggregation
// related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
	h.count++
	parts := strings.Split(message, "➔")

	// Index 0 contains 'AGGREGATE', 1 contains server host.
	// Aggregation data begins from index 2.
	logger.Debug("Received aggregate data", h.server, h.count, parts)
	h.aggregate.Aggregate(parts[2:])
	logger.Debug("Aggregated aggregate data", h.server, h.count)
}