1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
package mapr
import (
"fmt"
)
// GlobalGroupSet is used on the dtail client to merge multiple group sets
// (one group set per remote server) to one single global group set.
type GlobalGroupSet struct {
GroupSet
semaphore chan struct{}
}
// NewGlobalGroupSet creates a new empty global group set.
func NewGlobalGroupSet() *GlobalGroupSet {
g := GlobalGroupSet{
semaphore: make(chan struct{}, 1),
}
g.InitSet()
return &g
}
// String representation of the global group set.
func (g *GlobalGroupSet) String() string {
return fmt.Sprintf("GlobalGroupSet(%s)", g.GroupSet.String())
}
// Merge (blocking) a group set into the global group set.
func (g *GlobalGroupSet) Merge(query *Query, group *GroupSet) error {
g.semaphore <- struct{}{}
defer func() { <-g.semaphore }()
return g.merge(query, group)
}
// MergeNoblock merges (non-blocking) a group set into the global group set.
func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, error) {
select {
case g.semaphore <- struct{}{}:
err := g.merge(query, group)
<-g.semaphore
return true, err
default:
return false, nil
}
}
// Merge a group set into the global group set.
func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error {
for groupKey, set := range group.sets {
s := g.GetSet(groupKey)
if err := s.Merge(query, set); err != nil {
return err
}
}
return nil
}
// IsEmpty determines whether the global group set has any data in it.
func (g *GlobalGroupSet) IsEmpty() bool {
return g.NumSets() == 0
}
// NumSets determines the number of sets.
func (g *GlobalGroupSet) NumSets() int {
g.semaphore <- struct{}{}
defer func() { <-g.semaphore }()
return len(g.sets)
}
// SwapOut teturn the underlying group set and create a new empty one, so
// that the global group set is empty again and can aggregate new data.
func (g *GlobalGroupSet) SwapOut() *GroupSet {
g.semaphore <- struct{}{}
defer func() { <-g.semaphore }()
set := &GroupSet{sets: g.sets}
g.InitSet()
return set
}
// WriteResult writes the result of a mapreduce aggregation to an outfile.
func (g *GlobalGroupSet) WriteResult(query *Query, finalResult bool) error {
g.semaphore <- struct{}{}
defer func() { <-g.semaphore }()
return g.GroupSet.WriteResult(query, finalResult)
}
// Result returns the result of the mapreduce aggregation as a string.
func (g *GlobalGroupSet) Result(query *Query, rowsLimit int) (string, int, error) {
g.semaphore <- struct{}{}
defer func() { <-g.semaphore }()
return g.GroupSet.Result(query, rowsLimit)
}
|