summaryrefslogtreecommitdiff
path: root/internal/server/handlers/mapcommand.go
blob: 920c8dd558959863f1d7ac79b43fefab7ec0a1a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package handlers

import (
	"context"
	"strings"

	"github.com/mimecast/dtail/internal/io/dlog"
	"github.com/mimecast/dtail/internal/mapr/server"
)

// Map command implements the mapreduce command server side.
type mapCommand struct {
	aggregate      *server.Aggregate
	turboAggregate *server.TurboAggregate
	server         *ServerHandler
}

// NewMapCommand returns a new server side mapreduce command.
func newMapCommand(serverHandler *ServerHandler, argc int,
	args []string) (mapCommand, *server.Aggregate, *server.TurboAggregate, error) {

	m := mapCommand{server: serverHandler}
	queryStr := strings.Join(args[1:], " ")
	defaultLogFormat := ""
	if serverHandler.serverCfg != nil {
		defaultLogFormat = serverHandler.serverCfg.MapreduceLogFormat
	}

	// If turbo boost is not disabled AND we're in server mode (not serverless), create a TurboAggregate
	// Turbo boost is enabled by default and is a server-side optimization
	dlog.Server.Debug("MapReduce mode check", "turboBoostDisable", serverHandler.serverCfg.TurboBoostDisable, "serverless", serverHandler.serverless)
	if !serverHandler.serverCfg.TurboBoostDisable && !serverHandler.serverless {
		dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr)
		turboAggregate, err := server.NewTurboAggregate(queryStr, defaultLogFormat)
		if err != nil {
			return m, nil, nil, err
		}
		m.turboAggregate = turboAggregate
		return m, nil, turboAggregate, nil
	}

	// Otherwise, create a regular Aggregate
	aggregate, err := server.NewAggregate(queryStr, defaultLogFormat)
	if err != nil {
		return m, nil, nil, err
	}
	m.aggregate = aggregate
	return m, aggregate, nil, nil
}

func (m *mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) {
	if m.turboAggregate != nil {
		m.turboAggregate.Start(ctx, aggregatedMessages)
	} else {
		m.aggregate.Start(ctx, aggregatedMessages)
	}
}