diff options
| -rw-r--r-- | internal/clients/maprclient.go | 25 | ||||
| -rw-r--r-- | internal/clients/stats.go | 5 | ||||
| -rw-r--r-- | internal/mapr/server/aggregate.go | 19 |
3 files changed, 39 insertions, 10 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 2af038f..95b3a9c 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -145,11 +145,32 @@ func (c MaprClient) makeCommands() (commands []string) { func (c *MaprClient) periodicReportResults(ctx context.Context) { rampUpSleep := c.query.Interval / 2 dlog.Client.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep) - time.Sleep(rampUpSleep) + + if rampUpSleep > 0 { + rampUpTimer := time.NewTimer(rampUpSleep) + select { + case <-rampUpTimer.C: + case <-ctx.Done(): + if !rampUpTimer.Stop() { + select { + case <-rampUpTimer.C: + default: + } + } + return + } + } + + interval := c.query.Interval + if interval <= 0 { + interval = time.Second + } + ticker := time.NewTicker(interval) + defer ticker.Stop() for { select { - case <-time.After(c.query.Interval): + case <-ticker.C: dlog.Client.Debug("Gathering interim mapreduce result") if err := c.reportResults(false); err != nil { dlog.Client.Error("Unable to gather mapreduce result", err) diff --git a/internal/clients/stats.go b/internal/clients/stats.go index 7a6643b..5880fd1 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -39,6 +39,9 @@ func newTailStats(servers int) *stats { func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) { + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + var connectedLast int for { var force bool @@ -48,7 +51,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, case message := <-statsCh: messages = append(messages, message) force = true - case <-time.After(time.Second * 3): + case <-ticker.C: case <-ctx.Done(): return } diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 353cda5..98fe817 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -106,9 +106,16 @@ func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) { } func (a *Aggregate) aggregateTimer(ctx context.Context) { + interval := a.query.Interval + if interval <= 0 { + interval = time.Second + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { select { - case <-time.After(a.query.Interval): + case <-ticker.C: a.Serialize(ctx) case <-ctx.Done(): return @@ -139,12 +146,12 @@ func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) { case newLinesCh := <-a.NextLinesCh: oldLinesCh := a.linesCh a.linesCh = newLinesCh - + // Ensure the old channel is fully drained before recycling to prevent data mixing go func(oldCh chan *line.Line) { // First, drain any remaining lines from the old channel drained := 0 - drainLoop: + drainLoop: for { select { case l, ok := <-oldCh: @@ -161,11 +168,11 @@ func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) { break drainLoop } } - + if drained > 0 { dlog.Server.Debug("Drained", drained, "lines from recycled channel") } - + // Now safely recycle the drained channel timer := time.NewTimer(5 * time.Second) defer timer.Stop() @@ -342,5 +349,3 @@ func (a *Aggregate) Serialize(ctx context.Context) { case <-ctx.Done(): } } - - |
