1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
package handlers
import (
"strings"
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr/client"
"github.com/mimecast/dtail/internal/protocol"
)
// MaprHandler is the handler used on the client side for running mapreduce
// aggregations.
type MaprHandler struct {
baseHandler
aggregate *client.Aggregate
removedNl bool
}
// NewMaprHandler returns a new mapreduce client handler.
func NewMaprHandler(server string, session *client.SessionState) *MaprHandler {
return &MaprHandler{
baseHandler: baseHandler{
server: server,
shellStarted: false,
commands: make(chan string),
status: -1,
done: internal.NewDone(),
capabilities: make(map[string]struct{}),
capabilitiesCh: make(chan struct{}),
sessionAcks: make(chan SessionAck, 4),
},
aggregate: client.NewAggregate(server, session),
}
}
// Read data from the dtail server via Writer interface.
func (h *MaprHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
switch b {
case '\n':
h.removedNl = true
case protocol.MessageDelimiter:
message := h.baseHandler.receiveBuf.String()
dlog.Client.Debug(message)
if message[0] == 'A' {
h.handleAggregateMessage(message)
} else {
if h.removedNl {
h.baseHandler.handleMessage(message + "\n")
} else {
h.baseHandler.handleMessage(message)
}
}
h.baseHandler.receiveBuf.Reset()
h.removedNl = false
default:
h.baseHandler.receiveBuf.WriteByte(b)
}
}
return len(p), nil
}
// Handle a message received from server including mapr aggregation related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
parts := strings.SplitN(message, protocol.FieldDelimiter, 3)
if len(parts) != 3 {
dlog.Client.Error("Unable to aggregate data", h.server, message, parts,
len(parts), "expected 3 parts")
return
}
if err := h.aggregate.Aggregate(parts[2]); err != nil {
dlog.Client.Error("Unable to aggregate data", h.server, message, err)
}
}
|