1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package handlers
import (
"context"
"strings"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr/server"
)
// Map command implements the mapreduce command server side.
type mapCommand struct {
aggregate *server.Aggregate
turboAggregate *server.TurboAggregate
server *ServerHandler
}
// NewMapCommand returns a new server side mapreduce command.
func newMapCommand(serverHandler *ServerHandler, argc int,
args []string) (mapCommand, *server.Aggregate, *server.TurboAggregate, error) {
m := mapCommand{server: serverHandler}
queryStr := strings.Join(args[1:], " ")
defaultLogFormat := ""
if serverHandler.serverCfg != nil {
defaultLogFormat = serverHandler.serverCfg.MapreduceLogFormat
}
// If turbo boost is not disabled AND we're in server mode (not serverless), create a TurboAggregate
// Turbo boost is enabled by default and is a server-side optimization
dlog.Server.Debug("MapReduce mode check", "turboBoostDisable", serverHandler.serverCfg.TurboBoostDisable, "serverless", serverHandler.serverless)
if !serverHandler.serverCfg.TurboBoostDisable && !serverHandler.serverless {
dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr)
turboAggregate, err := server.NewTurboAggregate(queryStr, defaultLogFormat)
if err != nil {
return m, nil, nil, err
}
m.turboAggregate = turboAggregate
return m, nil, turboAggregate, nil
}
// Otherwise, create a regular Aggregate
aggregate, err := server.NewAggregate(queryStr, defaultLogFormat)
if err != nil {
return m, nil, nil, err
}
m.aggregate = aggregate
return m, aggregate, nil, nil
}
func (m *mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) {
if m.turboAggregate != nil {
m.turboAggregate.Start(ctx, aggregatedMessages)
} else {
m.aggregate.Start(ctx, aggregatedMessages)
}
}
|