diff options
Diffstat (limited to 'internal/mapr/server/turbo_aggregate.go')
| -rw-r--r-- | internal/mapr/server/turbo_aggregate.go | 45 |
1 files changed, 42 insertions, 3 deletions
diff --git a/internal/mapr/server/turbo_aggregate.go b/internal/mapr/server/turbo_aggregate.go index 9b5afe7..188be1c 100644 --- a/internal/mapr/server/turbo_aggregate.go +++ b/internal/mapr/server/turbo_aggregate.go @@ -56,6 +56,21 @@ type rawLine struct { sourceID string } +func (a *TurboAggregate) stopping() bool { + select { + case <-a.done.Done(): + return true + default: + return false + } +} + +func (a *TurboAggregate) stopSerializeTicker() { + if a.serializeTicker != nil { + a.serializeTicker.Stop() + } +} + // NewTurboAggregate returns a new turbo mode aggregator. func NewTurboAggregate(queryStr string, defaultLogFormat string) (*TurboAggregate, error) { query, err := mapr.NewQuery(queryStr) @@ -130,9 +145,7 @@ func (a *TurboAggregate) Shutdown() { a.done.Shutdown() // Stop the ticker - if a.serializeTicker != nil { - a.serializeTicker.Stop() - } + a.stopSerializeTicker() // Wait for active processors to finish for a.activeProcessors.Load() > 0 { @@ -185,6 +198,19 @@ func (a *TurboAggregate) Shutdown() { } } +// Abort stops background processing without waiting for final serialization. +// Session generation replacement uses this to preempt old query work immediately. +func (a *TurboAggregate) Abort() { + dlog.Server.Info("TurboAggregate: Abort called", + "linesProcessed", a.linesProcessed.Load(), + "filesProcessed", a.filesProcessed.Load(), + "activeProcessors", a.activeProcessors.Load(), + "currentGroups", a.countGroups()) + + a.done.Shutdown() + a.stopSerializeTicker() +} + // Start the turbo aggregation. func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) { a.maprMessages = maprMessages @@ -206,6 +232,10 @@ func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) // ProcessLineDirect processes a line directly without channels. // This is called from the TurboAggregateProcessor. func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string) error { + if a.stopping() { + return nil + } + // Increment counter first a.linesProcessed.Add(1) @@ -642,6 +672,11 @@ func NewTurboAggregateProcessor(aggregate *TurboAggregate, globID string) *Turbo // ProcessLine processes a line directly to the turbo aggregate. func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error { + if p.aggregate.stopping() { + pool.RecycleBytesBuffer(lineContent) + return nil + } + // Debug: Log when ProcessLine is called if lineNum == 1 || lineNum%1000 == 0 { dlog.Server.Info("TurboAggregateProcessor: ProcessLine called", @@ -661,6 +696,10 @@ func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum // Flush ensures all buffered data is processed. func (p *TurboAggregateProcessor) Flush() error { + if p.aggregate.stopping() { + return nil + } + // Log flush call for debugging dlog.Server.Info("TurboAggregateProcessor: Flush called", "globID", p.globID, |
