diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-08 09:33:22 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-08 09:33:22 +0200 |
| commit | 7179dba1f70f7fbdc8b89bf709bc2d5b643fe692 (patch) | |
| tree | 7340de538cfcc583102aa5697a65801501ec32c4 /internal/io/fs/readfile_processor_optimized.go | |
| parent | 91b83a9ffcabf7264888cf84b95f08b8cc88c832 (diff) | |
task: close compressed readers in file read paths (task 377)
Diffstat (limited to 'internal/io/fs/readfile_processor_optimized.go')
| -rw-r--r-- | internal/io/fs/readfile_processor_optimized.go | 97 |
1 files changed, 52 insertions, 45 deletions
diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go index 96d5a16..3e460ed 100644 --- a/internal/io/fs/readfile_processor_optimized.go +++ b/internal/io/fs/readfile_processor_optimized.go @@ -32,23 +32,23 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File, // Use a scanner for efficient line reading scanner := bufio.NewScanner(reader) - + // Get a buffer from the pool instead of allocating a new one bufPtr := pool.GetScannerBuffer() buf := *bufPtr - maxTokenSize := 1024 * 1024 // 1MB max token size + maxTokenSize := 1024 * 1024 // 1MB max token size scanner.Buffer(buf, maxTokenSize) - + // Ensure we return the buffer to the pool when done defer pool.PutScannerBuffer(bufPtr) - + // Use custom split function that preserves line endings scanner.Split(f.scanLinesPreserveEndings) - + // Track truncation checks lastTruncateCheck := time.Now() truncateCheckInterval := 3 * time.Second - + lineCount := 0 for scanner.Scan() { // Check context cancellation @@ -57,7 +57,7 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File, return nil default: } - + // Check for file truncation periodically if time.Since(lastTruncateCheck) > truncateCheckInterval { select { @@ -69,23 +69,23 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File, } lastTruncateCheck = time.Now() } - + // Get the line data lineData := scanner.Bytes() - + // Get a buffer from the pool and copy the data lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) lineBuf.Write(lineData) - + // Process the line f.updatePosition() if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil { return err } - + lineCount++ } - + // Check for scanner errors if err := scanner.Err(); err != nil { // Handle EOF specially for tailing @@ -95,7 +95,7 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File, } return err } - + return nil } @@ -105,9 +105,9 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in if atEOF && len(data) == 0 { return 0, nil, nil } - + maxLineLen := config.Server.MaxLineLength - + // Look for a newline if i := bytes.IndexByte(data, '\n'); i >= 0 { // Check if the line before the newline exceeds max length @@ -116,7 +116,7 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in // In turbo mode, we handle long lines silently return maxLineLen, data[0:maxLineLen], nil } - + // Line is within limit, include the line ending in the token // Check if there's a \r before the \n if i > 0 && data[i-1] == '\r' { @@ -126,7 +126,7 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in // Unix line ending (\n) - include it in token return i + 1, data[0 : i+1], nil } - + // If we're at EOF, we have a final, non-terminated line if atEOF { if len(data) > maxLineLen { @@ -136,14 +136,14 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in } return len(data), data, nil } - + // If the line is too long, split it if len(data) >= maxLineLen { // Return a chunk up to MaxLineLength // In turbo mode, we handle long lines silently return maxLineLen, data[0:maxLineLen], nil } - + // Request more data return 0, nil, nil } @@ -153,9 +153,9 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, if atEOF && len(data) == 0 { return 0, nil, nil } - + maxLineLen := config.Server.MaxLineLength - + // Look for a newline if i := bytes.IndexByte(data, '\n'); i >= 0 { // Check if the line before the newline exceeds max length @@ -174,7 +174,7 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, f.warnedAboutLongLine = false // Reset warning for next long line sequence return i + 1, data[0:i], nil } - + // If we're at EOF, we have a final, non-terminated line if atEOF { if len(data) > maxLineLen { @@ -190,7 +190,7 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, } return len(data), data, nil } - + // If the line is too long, split it if len(data) >= maxLineLen { // Warn about long line (only once) @@ -199,11 +199,11 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, "Long log line, splitting into multiple lines") + "\n" f.warnedAboutLongLine = true } - + // Return a chunk up to MaxLineLength return maxLineLen, data[0:maxLineLen], nil } - + // Request more data return 0, nil, nil } @@ -213,10 +213,17 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext.LContext, processor line.Processor, re regex.Regex) error { - reader, fd, err := f.makeReader() + reader, fd, decompressor, err := f.makeReader() if fd != nil { defer fd.Close() } + if decompressor != nil { + defer func() { + if closeErr := decompressor.Close(); closeErr != nil { + dlog.Common.Warn(f.filePath, "Unable to close compressed reader", closeErr) + } + }() + } if err != nil { return err } @@ -224,7 +231,7 @@ func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext // Create a cancelable context for the truncate check goroutine truncateCtx, cancelTruncate := context.WithCancel(ctx) defer cancelTruncate() - + truncate := make(chan struct{}) go f.periodicTruncateCheck(truncateCtx, truncate) @@ -236,7 +243,7 @@ func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext // For cat/grep mode, just read once err = f.readWithProcessorOptimized(ctx, fd, reader, truncate, ltx, processor, re) - + // Ensure any buffered data is flushed if flushErr := processor.Flush(); flushErr != nil && err == nil { err = flushErr @@ -265,45 +272,45 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, // Get a buffer from the pool for reading bufPtr := pool.GetMediumBuffer() defer pool.PutMediumBuffer(bufPtr) - + for { // Read available data using pooled buffer buf := (*bufPtr)[:cap(*bufPtr)] // Reset to full capacity n, err := reader.Read(buf) - + if n > 0 { // Process the data we read data := buf[:n] - + // Process complete lines for len(data) > 0 { // Find newline idx := bytes.IndexByte(data, '\n') - + if idx >= 0 { // Complete line found partialLine.Write(data[:idx]) - + // Process the line if it's not empty if partialLine.Len() > 0 { f.updatePosition() lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) lineBuf.Write(partialLine.Bytes()) - + if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil { return err } } - + partialLine.Reset() data = data[idx+1:] - + // Reset long line warning f.warnedAboutLongLine = false } else { // No newline, add to partial line partialLine.Write(data) - + // Check if line is too long if partialLine.Len() >= config.Server.MaxLineLength { if !f.warnedAboutLongLine { @@ -311,35 +318,35 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, "Long log line, splitting into multiple lines") + "\n" f.warnedAboutLongLine = true } - + // Process the partial line f.updatePosition() lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) lineBuf.Write(partialLine.Bytes()) - + if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil { return err } - + partialLine.Reset() } - + break } } - + // Flush processor periodically if err := processor.Flush(); err != nil { return err } } - + // Handle read errors if err != nil { if err != io.EOF { return err } - + // EOF handling select { case <-ctx.Done(): @@ -352,7 +359,7 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, // Continue reading after a short delay } } - + // Check for cancellation select { case <-ctx.Done(): |
