summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-08 09:34:32 +0200
committerPaul Buetow <paul@buetow.org>2026-03-08 09:34:32 +0200
commitf7f98ccaffc1be88db6f9814fb3c88b5f0a6ea34 (patch)
treeb16c9be77e4d4f9e2fedba83b356e6abf21b6d0e
parent7179dba1f70f7fbdc8b89bf709bc2d5b643fe692 (diff)
task: replace looped time.After with tickers (task 378)
-rw-r--r--internal/clients/maprclient.go25
-rw-r--r--internal/clients/stats.go5
-rw-r--r--internal/mapr/server/aggregate.go19
3 files changed, 39 insertions, 10 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 2af038f..95b3a9c 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -145,11 +145,32 @@ func (c MaprClient) makeCommands() (commands []string) {
func (c *MaprClient) periodicReportResults(ctx context.Context) {
rampUpSleep := c.query.Interval / 2
dlog.Client.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep)
- time.Sleep(rampUpSleep)
+
+ if rampUpSleep > 0 {
+ rampUpTimer := time.NewTimer(rampUpSleep)
+ select {
+ case <-rampUpTimer.C:
+ case <-ctx.Done():
+ if !rampUpTimer.Stop() {
+ select {
+ case <-rampUpTimer.C:
+ default:
+ }
+ }
+ return
+ }
+ }
+
+ interval := c.query.Interval
+ if interval <= 0 {
+ interval = time.Second
+ }
+ ticker := time.NewTicker(interval)
+ defer ticker.Stop()
for {
select {
- case <-time.After(c.query.Interval):
+ case <-ticker.C:
dlog.Client.Debug("Gathering interim mapreduce result")
if err := c.reportResults(false); err != nil {
dlog.Client.Error("Unable to gather mapreduce result", err)
diff --git a/internal/clients/stats.go b/internal/clients/stats.go
index 7a6643b..5880fd1 100644
--- a/internal/clients/stats.go
+++ b/internal/clients/stats.go
@@ -39,6 +39,9 @@ func newTailStats(servers int) *stats {
func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{},
statsCh <-chan string, quiet bool) {
+ ticker := time.NewTicker(3 * time.Second)
+ defer ticker.Stop()
+
var connectedLast int
for {
var force bool
@@ -48,7 +51,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{},
case message := <-statsCh:
messages = append(messages, message)
force = true
- case <-time.After(time.Second * 3):
+ case <-ticker.C:
case <-ctx.Done():
return
}
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 353cda5..98fe817 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -106,9 +106,16 @@ func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) {
}
func (a *Aggregate) aggregateTimer(ctx context.Context) {
+ interval := a.query.Interval
+ if interval <= 0 {
+ interval = time.Second
+ }
+ ticker := time.NewTicker(interval)
+ defer ticker.Stop()
+
for {
select {
- case <-time.After(a.query.Interval):
+ case <-ticker.C:
a.Serialize(ctx)
case <-ctx.Done():
return
@@ -139,12 +146,12 @@ func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) {
case newLinesCh := <-a.NextLinesCh:
oldLinesCh := a.linesCh
a.linesCh = newLinesCh
-
+
// Ensure the old channel is fully drained before recycling to prevent data mixing
go func(oldCh chan *line.Line) {
// First, drain any remaining lines from the old channel
drained := 0
- drainLoop:
+ drainLoop:
for {
select {
case l, ok := <-oldCh:
@@ -161,11 +168,11 @@ func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) {
break drainLoop
}
}
-
+
if drained > 0 {
dlog.Server.Debug("Drained", drained, "lines from recycled channel")
}
-
+
// Now safely recycle the drained channel
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
@@ -342,5 +349,3 @@ func (a *Aggregate) Serialize(ctx context.Context) {
case <-ctx.Done():
}
}
-
-