summaryrefslogtreecommitdiff
path: root/internal/mapr/server/turbo_aggregate.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/mapr/server/turbo_aggregate.go')
-rw-r--r--internal/mapr/server/turbo_aggregate.go45
1 files changed, 42 insertions, 3 deletions
diff --git a/internal/mapr/server/turbo_aggregate.go b/internal/mapr/server/turbo_aggregate.go
index 9b5afe7..188be1c 100644
--- a/internal/mapr/server/turbo_aggregate.go
+++ b/internal/mapr/server/turbo_aggregate.go
@@ -56,6 +56,21 @@ type rawLine struct {
sourceID string
}
+func (a *TurboAggregate) stopping() bool {
+ select {
+ case <-a.done.Done():
+ return true
+ default:
+ return false
+ }
+}
+
+func (a *TurboAggregate) stopSerializeTicker() {
+ if a.serializeTicker != nil {
+ a.serializeTicker.Stop()
+ }
+}
+
// NewTurboAggregate returns a new turbo mode aggregator.
func NewTurboAggregate(queryStr string, defaultLogFormat string) (*TurboAggregate, error) {
query, err := mapr.NewQuery(queryStr)
@@ -130,9 +145,7 @@ func (a *TurboAggregate) Shutdown() {
a.done.Shutdown()
// Stop the ticker
- if a.serializeTicker != nil {
- a.serializeTicker.Stop()
- }
+ a.stopSerializeTicker()
// Wait for active processors to finish
for a.activeProcessors.Load() > 0 {
@@ -185,6 +198,19 @@ func (a *TurboAggregate) Shutdown() {
}
}
+// Abort stops background processing without waiting for final serialization.
+// Session generation replacement uses this to preempt old query work immediately.
+func (a *TurboAggregate) Abort() {
+ dlog.Server.Info("TurboAggregate: Abort called",
+ "linesProcessed", a.linesProcessed.Load(),
+ "filesProcessed", a.filesProcessed.Load(),
+ "activeProcessors", a.activeProcessors.Load(),
+ "currentGroups", a.countGroups())
+
+ a.done.Shutdown()
+ a.stopSerializeTicker()
+}
+
// Start the turbo aggregation.
func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) {
a.maprMessages = maprMessages
@@ -206,6 +232,10 @@ func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string)
// ProcessLineDirect processes a line directly without channels.
// This is called from the TurboAggregateProcessor.
func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string) error {
+ if a.stopping() {
+ return nil
+ }
+
// Increment counter first
a.linesProcessed.Add(1)
@@ -642,6 +672,11 @@ func NewTurboAggregateProcessor(aggregate *TurboAggregate, globID string) *Turbo
// ProcessLine processes a line directly to the turbo aggregate.
func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error {
+ if p.aggregate.stopping() {
+ pool.RecycleBytesBuffer(lineContent)
+ return nil
+ }
+
// Debug: Log when ProcessLine is called
if lineNum == 1 || lineNum%1000 == 0 {
dlog.Server.Info("TurboAggregateProcessor: ProcessLine called",
@@ -661,6 +696,10 @@ func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum
// Flush ensures all buffered data is processed.
func (p *TurboAggregateProcessor) Flush() error {
+ if p.aggregate.stopping() {
+ return nil
+ }
+
// Log flush call for debugging
dlog.Server.Info("TurboAggregateProcessor: Flush called",
"globID", p.globID,