summaryrefslogtreecommitdiff
path: root/internal/io/fs/readfile_processor_optimized.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-30 23:22:56 +0300
committerPaul Buetow <paul@buetow.org>2025-06-30 23:22:56 +0300
commitb4ca43d97c83c3b9da7138b3b4d6f6cce6fed370 (patch)
tree1dba534b8c7b1784f712cec90ff945e3d7fb7a82 /internal/io/fs/readfile_processor_optimized.go
parent88886206c2c758bf619362aaa484dd3e254b8ed1 (diff)
fix: ensure complete data transmission in turbo mode for dtail operations
This commit fixes integration test failures in turbo mode where data was not being fully transmitted before the connection closed. The main issue was that readWithTurboProcessor was returning too quickly without ensuring all data had been written to the network stream. Key changes: - Add comprehensive trace logging to track data flow in turbo mode - Fix turbo channel draining mechanism in baseHandler.Read() to wait for all data - Add proper flushing in TurboNetworkWriter with channel drain synchronization - Increase flush timeout from 10 to 100 iterations for turbo mode data volumes - Fix color formatting in serverless mode by processing lines individually - Add synchronization delays to ensure data transmission completes The fixes ensure that all data is properly transmitted before connection closure, resolving TestDcat integration test failures when DTAIL_TURBOBOOST_ENABLE is set. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/io/fs/readfile_processor_optimized.go')
-rw-r--r--internal/io/fs/readfile_processor_optimized.go76
1 files changed, 67 insertions, 9 deletions
diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go
index 03bece8..8874753 100644
--- a/internal/io/fs/readfile_processor_optimized.go
+++ b/internal/io/fs/readfile_processor_optimized.go
@@ -33,17 +33,20 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File,
// Use a scanner for efficient line reading
scanner := bufio.NewScanner(reader)
- // Set a custom buffer size for the scanner (default is 64KB, we'll use 256KB)
- buf := make([]byte, 256*1024)
- scanner.Buffer(buf, config.Server.MaxLineLength)
+ // Set a custom buffer size for the scanner (default is 64KB, we'll use 1MB)
+ // The second parameter is the maximum token size, not the buffer size
+ buf := make([]byte, 1024*1024) // 1MB buffer
+ maxTokenSize := 1024 * 1024 // 1MB max token size
+ scanner.Buffer(buf, maxTokenSize)
- // Custom split function to handle lines up to MaxLineLength
- scanner.Split(f.scanLinesWithMaxLength)
+ // Use custom split function that preserves line endings
+ scanner.Split(f.scanLinesPreserveEndings)
// Track truncation checks
lastTruncateCheck := time.Now()
truncateCheckInterval := 3 * time.Second
+ lineCount := 0
for scanner.Scan() {
// Check context cancellation
select {
@@ -76,6 +79,8 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File,
if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil {
return err
}
+
+ lineCount++
}
// Check for scanner errors
@@ -91,6 +96,55 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File,
return nil
}
+// scanLinesPreserveEndings is a custom split function that preserves original line endings
+// and respects MaxLineLength
+func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance int, token []byte, err error) {
+ if atEOF && len(data) == 0 {
+ return 0, nil, nil
+ }
+
+ maxLineLen := config.Server.MaxLineLength
+
+ // Look for a newline
+ if i := bytes.IndexByte(data, '\n'); i >= 0 {
+ // Check if the line before the newline exceeds max length
+ if i > maxLineLen {
+ // Line is too long, split it at maxLineLen
+ // In turbo mode, we handle long lines silently
+ return maxLineLen, data[0:maxLineLen], nil
+ }
+
+ // Line is within limit, include the line ending in the token
+ // Check if there's a \r before the \n
+ if i > 0 && data[i-1] == '\r' {
+ // Windows line ending (\r\n) - include both in token
+ return i + 1, data[0 : i+1], nil
+ }
+ // Unix line ending (\n) - include it in token
+ return i + 1, data[0 : i+1], nil
+ }
+
+ // If we're at EOF, we have a final, non-terminated line
+ if atEOF {
+ if len(data) > maxLineLen {
+ // Even at EOF, respect max line length
+ // In turbo mode, we handle long lines silently
+ return maxLineLen, data[0:maxLineLen], nil
+ }
+ return len(data), data, nil
+ }
+
+ // If the line is too long, split it
+ if len(data) >= maxLineLen {
+ // Return a chunk up to MaxLineLength
+ // In turbo mode, we handle long lines silently
+ return maxLineLen, data[0:maxLineLen], nil
+ }
+
+ // Request more data
+ return 0, nil, nil
+}
+
// scanLinesWithMaxLength is a custom split function for bufio.Scanner that respects MaxLineLength
func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
@@ -105,8 +159,10 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
if i > maxLineLen {
// Line is too long, split it at maxLineLen
if !f.warnedAboutLongLine {
- f.serverMessages <- dlog.Common.Warn(f.filePath,
- "Long log line, splitting into multiple lines") + "\n"
+ if f.serverMessages != nil {
+ f.serverMessages <- dlog.Common.Warn(f.filePath,
+ "Long log line, splitting into multiple lines") + "\n"
+ }
f.warnedAboutLongLine = true
}
return maxLineLen, data[0:maxLineLen], nil
@@ -121,8 +177,10 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
if len(data) > maxLineLen {
// Even at EOF, respect max line length
if !f.warnedAboutLongLine {
- f.serverMessages <- dlog.Common.Warn(f.filePath,
- "Long log line, splitting into multiple lines") + "\n"
+ if f.serverMessages != nil {
+ f.serverMessages <- dlog.Common.Warn(f.filePath,
+ "Long log line, splitting into multiple lines") + "\n"
+ }
f.warnedAboutLongLine = true
}
return maxLineLen, data[0:maxLineLen], nil