summaryrefslogtreecommitdiff
path: root/internal/clients/handlers/maprhandler.go
blob: 9e9a0d1b11f67a274c6967c48c0edfc34a6fce96 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package handlers

import (
	"strings"

	"github.com/mimecast/dtail/internal"
	"github.com/mimecast/dtail/internal/io/dlog"
	"github.com/mimecast/dtail/internal/mapr"
	"github.com/mimecast/dtail/internal/mapr/client"
	"github.com/mimecast/dtail/internal/protocol"
)

// MaprHandler is the handler used on the client side for running mapreduce
// aggregations.
type MaprHandler struct {
	baseHandler
	aggregate *client.Aggregate
	query     *mapr.Query
	removedNl bool
}

// NewMaprHandler returns a new mapreduce client handler.
func NewMaprHandler(server string, query *mapr.Query,
	globalGroup *mapr.GlobalGroupSet) *MaprHandler {

	return &MaprHandler{
		baseHandler: baseHandler{
			server:         server,
			shellStarted:   false,
			commands:       make(chan string),
			status:         -1,
			done:           internal.NewDone(),
			capabilities:   make(map[string]struct{}),
			capabilitiesCh: make(chan struct{}),
		},
		query:     query,
		aggregate: client.NewAggregate(server, query, globalGroup),
	}
}

// Read data from the dtail server via Writer interface.
func (h *MaprHandler) Write(p []byte) (n int, err error) {
	for _, b := range p {
		switch b {
		case '\n':
			h.removedNl = true
		case protocol.MessageDelimiter:
			message := h.baseHandler.receiveBuf.String()
			dlog.Client.Debug(message)
			if message[0] == 'A' {
				h.handleAggregateMessage(message)
			} else {
				if h.removedNl {
					h.baseHandler.handleMessage(message + "\n")
				} else {
					h.baseHandler.handleMessage(message)
				}
			}
			h.baseHandler.receiveBuf.Reset()
			h.removedNl = false
		default:
			h.baseHandler.receiveBuf.WriteByte(b)
		}
	}

	return len(p), nil
}

// Handle a message received from server including mapr aggregation related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
	parts := strings.SplitN(message, protocol.FieldDelimiter, 3)
	if len(parts) != 3 {
		dlog.Client.Error("Unable to aggregate data", h.server, message, parts,
			len(parts), "expected 3 parts")
		return
	}
	if err := h.aggregate.Aggregate(parts[2]); err != nil {
		dlog.Client.Error("Unable to aggregate data", h.server, message, err)
	}
}