diff options
| author | Paul Buetow <paul@buetow.org> | 2025-07-03 17:58:06 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-07-03 17:58:06 +0300 |
| commit | 859be4593e4f7ef37ff2c91dc90f42e6930a3996 (patch) | |
| tree | a73597068c3e5f34017d4e348267f8051f3be614 /internal/server/handlers/basehandler.go | |
| parent | f1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (diff) | |
fix: improve turbo mode MapReduce batch processing and shutdown sequence
- Fixed batch processor to use synchronous processing during shutdown
- Added processBatchAndWait method for guaranteed batch completion
- Fixed Flush() to ensure all data is processed before file completion
- Improved parser selection logic for table-based queries
- Added extensive debug logging for troubleshooting
- Increased wait times for serialization during shutdown
These changes address data loss issues when processing multiple files
concurrently in turbo mode. The batch processor now properly flushes
all remaining data when files complete and during shutdown.
Note: Integration tests still failing due to SSH authentication issues
in test environment, but core turbo mode logic has been fixed.
🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/basehandler.go')
| -rw-r--r-- | internal/server/handlers/basehandler.go | 14 |
1 files changed, 14 insertions, 0 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 427ab6c..3bb824b 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -409,6 +409,20 @@ func (h *baseHandler) shutdown() { h.flushTurboData() } + // Shutdown aggregates BEFORE flush to ensure MapReduce data is available + if h.turboAggregate != nil { + dlog.Server.Info(h.user, "Shutting down turbo aggregate in shutdown()") + h.turboAggregate.Shutdown() + // Give time for serialization to complete + time.Sleep(100 * time.Millisecond) + } + if h.aggregate != nil { + dlog.Server.Info(h.user, "Shutting down regular aggregate in shutdown()") + h.aggregate.Shutdown() + // Give time for serialization to complete + time.Sleep(100 * time.Millisecond) + } + h.flush() go func() { |
