summaryrefslogtreecommitdiff
path: root/internal/mapr/globalgroupset.go
blob: 2b1289898c7a1fc2f7a553811de216c750f75b7a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package mapr

import (
	"fmt"
)

// GlobalGroupSet is used on the dtail client to merge multiple group sets
// (one group set per remote server) to one single global group set.
type GlobalGroupSet struct {
	GroupSet
	semaphore chan struct{}
}

// NewGlobalGroupSet creates a new empty global group set.
func NewGlobalGroupSet() *GlobalGroupSet {
	g := GlobalGroupSet{
		semaphore: make(chan struct{}, 1),
	}
	g.InitSet()
	return &g
}

// String representation of the global group set.
func (g *GlobalGroupSet) String() string {
	return fmt.Sprintf("GlobalGroupSet(%s)", g.GroupSet.String())
}

// Merge (blocking) a group set into the global group set.
func (g *GlobalGroupSet) Merge(query *Query, group *GroupSet) error {
	g.semaphore <- struct{}{}
	defer func() { <-g.semaphore }()
	return g.merge(query, group)
}

// MergeNoblock merges (non-blocking) a group set into the global group set.
func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, error) {
	select {
	case g.semaphore <- struct{}{}:
		err := g.merge(query, group)
		<-g.semaphore
		return true, err
	default:
		return false, nil
	}
}

// Merge a group set into the global group set.
func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error {
	for groupKey, set := range group.sets {
		s := g.GetSet(groupKey)
		if err := s.Merge(query, set); err != nil {
			return err
		}
	}
	return nil
}

// IsEmpty determines whether the global group set has any data in it.
func (g *GlobalGroupSet) IsEmpty() bool {
	return g.NumSets() == 0
}

// NumSets determines the number of sets.
func (g *GlobalGroupSet) NumSets() int {
	g.semaphore <- struct{}{}
	defer func() { <-g.semaphore }()
	return len(g.sets)
}

// SwapOut teturn the underlying group set and create a new empty one, so
// that the global group set is empty again and can aggregate new data.
func (g *GlobalGroupSet) SwapOut() *GroupSet {
	g.semaphore <- struct{}{}
	defer func() { <-g.semaphore }()

	set := &GroupSet{sets: g.sets}
	g.InitSet()
	return set
}

// WriteResult writes the result of a mapreduce aggregation to an outfile.
func (g *GlobalGroupSet) WriteResult(query *Query, finalResult bool) error {
	g.semaphore <- struct{}{}
	defer func() { <-g.semaphore }()
	return g.GroupSet.WriteResult(query, finalResult)
}

// Result returns the result of the mapreduce aggregation as a string.
func (g *GlobalGroupSet) Result(query *Query, rowsLimit int) (string, int, error) {
	g.semaphore <- struct{}{}
	defer func() { <-g.semaphore }()
	return g.GroupSet.Result(query, rowsLimit)
}