diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-08 09:33:22 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-08 09:33:22 +0200 |
| commit | 7179dba1f70f7fbdc8b89bf709bc2d5b643fe692 (patch) | |
| tree | 7340de538cfcc583102aa5697a65801501ec32c4 /internal/io | |
| parent | 91b83a9ffcabf7264888cf84b95f08b8cc88c832 (diff) | |
task: close compressed readers in file read paths (task 377)
Diffstat (limited to 'internal/io')
| -rw-r--r-- | internal/io/fs/readfile.go | 28 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor.go | 55 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor_optimized.go | 97 |
3 files changed, 102 insertions, 78 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index fab933b..ee486bc 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -76,10 +76,17 @@ func (f readFile) Retry() bool { func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, lines chan<- *line.Line, re regex.Regex) error { - reader, fd, err := f.makeReader() + reader, fd, decompressor, err := f.makeReader() if fd != nil { defer fd.Close() } + if decompressor != nil { + defer func() { + if closeErr := decompressor.Close(); closeErr != nil { + dlog.Common.Warn(f.filePath, "Unable to close compressed reader", closeErr) + } + }() + } if err != nil { return err } @@ -109,14 +116,14 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, return err } -func (f *readFile) makeReader() (*bufio.Reader, *os.File, error) { +func (f *readFile) makeReader() (*bufio.Reader, *os.File, io.Closer, error) { if f.filePath == "" && f.globID == "-" { return f.makePipeReader() } return f.makeFileReader() } -func (f *readFile) makeFileReader() (reader *bufio.Reader, fd *os.File, err error) { +func (f *readFile) makeFileReader() (reader *bufio.Reader, fd *os.File, decompressor io.Closer, err error) { if fd, err = os.Open(f.filePath); err != nil { return } @@ -127,18 +134,18 @@ func (f *readFile) makeFileReader() (reader *bufio.Reader, fd *os.File, err erro } } - reader, err = f.makeCompressedFileReader(fd) + reader, decompressor, err = f.makeCompressedFileReader(fd) return } -func (f *readFile) makePipeReader() (*bufio.Reader, *os.File, error) { - return bufio.NewReader(os.Stdin), nil, nil +func (f *readFile) makePipeReader() (*bufio.Reader, *os.File, io.Closer, error) { + return bufio.NewReader(os.Stdin), nil, nil, nil } func (f *readFile) periodicTruncateCheck(ctx context.Context, truncate chan<- struct{}) { ticker := time.NewTicker(time.Second * 3) defer ticker.Stop() - + for { select { case <-ticker.C: @@ -153,7 +160,7 @@ func (f *readFile) periodicTruncateCheck(ctx context.Context, truncate chan<- st } } -func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, err error) { +func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, decompressor io.Closer, err error) { switch { case strings.HasSuffix(f.FilePath(), ".gz"): fallthrough @@ -164,10 +171,13 @@ func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, if err != nil { return } + decompressor = gzipReader reader = bufio.NewReader(gzipReader) case strings.HasSuffix(f.FilePath(), ".zst"): dlog.Common.Info(f.FilePath(), "Detected zstd compression format") - reader = bufio.NewReader(zstd.NewReader(fd)) + zstdReader := zstd.NewReader(fd) + decompressor = zstdReader + reader = bufio.NewReader(zstdReader) default: reader = bufio.NewReader(fd) } diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go index 4e636a4..94a28dc 100644 --- a/internal/io/fs/readfile_processor.go +++ b/internal/io/fs/readfile_processor.go @@ -21,10 +21,17 @@ import ( func (f *readFile) StartWithProcessor(ctx context.Context, ltx lcontext.LContext, processor line.Processor, re regex.Regex) error { - reader, fd, err := f.makeReader() + reader, fd, decompressor, err := f.makeReader() if fd != nil { defer fd.Close() } + if decompressor != nil { + defer func() { + if closeErr := decompressor.Close(); closeErr != nil { + dlog.Common.Warn(f.filePath, "Unable to close compressed reader", closeErr) + } + }() + } if err != nil { return err } @@ -38,7 +45,7 @@ func (f *readFile) StartWithProcessor(ctx context.Context, ltx lcontext.LContext // Process file with direct callbacks instead of channels err = f.readWithProcessor(ctx, fd, reader, truncate, ltx, processor, re) - + // Ensure any buffered data is flushed if flushErr := processor.Flush(); flushErr != nil && err == nil { err = flushErr @@ -100,7 +107,7 @@ func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte, if err := processor.ProcessFilteredLine(message); err != nil { return abortReading } - + f.warnedAboutLongLine = false return continueReading @@ -113,7 +120,7 @@ func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte, } // Force a line break message.WriteByte('\n') - + // Process the line f.updatePosition() if err := processor.ProcessFilteredLine(message); err != nil { @@ -164,19 +171,19 @@ type filteringProcessor struct { ltx lcontext.LContext stats *stats globID string - + // For local context handling - beforeBuf []*bytes.Buffer - afterCount int - maxCount int - maxReached bool + beforeBuf []*bytes.Buffer + afterCount int + maxCount int + maxReached bool } // ProcessFilteredLine applies regex filtering before passing to the underlying processor func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error { // Update stats lineNum := fp.stats.totalLineCount() - + // Simple case: no local context if !fp.ltx.Has() { if !fp.re.Match(rawLine.Bytes()) { @@ -185,10 +192,10 @@ func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error { pool.RecycleBytesBuffer(rawLine) return nil } - + fp.stats.updateLineMatched() fp.stats.updateLineTransmitted() - + // Process the line err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID) if err != nil { @@ -196,7 +203,7 @@ func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error { } return err } - + // Complex case: handle local context (before/after/max) return fp.processWithContext(rawLine, lineNum) } @@ -204,10 +211,10 @@ func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error { // processWithContext handles lines when local context is enabled func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum uint64) error { matched := fp.re.Match(rawLine.Bytes()) - + if !matched { fp.stats.updateLineNotMatched() - + // Handle after context if fp.ltx.AfterContext > 0 && fp.afterCount > 0 { fp.afterCount-- @@ -218,7 +225,7 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum } return err } - + // Handle before context buffer if fp.ltx.BeforeContext > 0 { // Add to before buffer @@ -231,20 +238,20 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum } else { pool.RecycleBytesBuffer(rawLine) } - + fp.stats.updateLineNotTransmitted() return nil } - + // Line matched fp.stats.updateLineMatched() - + // Check if we've reached max count if fp.maxReached { pool.RecycleBytesBuffer(rawLine) return io.EOF // Stop processing } - + // Process before context if fp.ltx.BeforeContext > 0 && len(fp.beforeBuf) > 0 { for i, buf := range fp.beforeBuf { @@ -260,14 +267,14 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum } fp.beforeBuf = fp.beforeBuf[:0] // Clear the buffer } - + // Process the matched line fp.stats.updateLineTransmitted() if err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID); err != nil { pool.RecycleBytesBuffer(rawLine) return err } - + // Update max count if fp.ltx.MaxCount > 0 { fp.maxCount++ @@ -278,11 +285,11 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum fp.maxReached = true } } - + // Reset after context if fp.ltx.AfterContext > 0 { fp.afterCount = fp.ltx.AfterContext } - + return nil } diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go index 96d5a16..3e460ed 100644 --- a/internal/io/fs/readfile_processor_optimized.go +++ b/internal/io/fs/readfile_processor_optimized.go @@ -32,23 +32,23 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File, // Use a scanner for efficient line reading scanner := bufio.NewScanner(reader) - + // Get a buffer from the pool instead of allocating a new one bufPtr := pool.GetScannerBuffer() buf := *bufPtr - maxTokenSize := 1024 * 1024 // 1MB max token size + maxTokenSize := 1024 * 1024 // 1MB max token size scanner.Buffer(buf, maxTokenSize) - + // Ensure we return the buffer to the pool when done defer pool.PutScannerBuffer(bufPtr) - + // Use custom split function that preserves line endings scanner.Split(f.scanLinesPreserveEndings) - + // Track truncation checks lastTruncateCheck := time.Now() truncateCheckInterval := 3 * time.Second - + lineCount := 0 for scanner.Scan() { // Check context cancellation @@ -57,7 +57,7 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File, return nil default: } - + // Check for file truncation periodically if time.Since(lastTruncateCheck) > truncateCheckInterval { select { @@ -69,23 +69,23 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File, } lastTruncateCheck = time.Now() } - + // Get the line data lineData := scanner.Bytes() - + // Get a buffer from the pool and copy the data lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) lineBuf.Write(lineData) - + // Process the line f.updatePosition() if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil { return err } - + lineCount++ } - + // Check for scanner errors if err := scanner.Err(); err != nil { // Handle EOF specially for tailing @@ -95,7 +95,7 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File, } return err } - + return nil } @@ -105,9 +105,9 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in if atEOF && len(data) == 0 { return 0, nil, nil } - + maxLineLen := config.Server.MaxLineLength - + // Look for a newline if i := bytes.IndexByte(data, '\n'); i >= 0 { // Check if the line before the newline exceeds max length @@ -116,7 +116,7 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in // In turbo mode, we handle long lines silently return maxLineLen, data[0:maxLineLen], nil } - + // Line is within limit, include the line ending in the token // Check if there's a \r before the \n if i > 0 && data[i-1] == '\r' { @@ -126,7 +126,7 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in // Unix line ending (\n) - include it in token return i + 1, data[0 : i+1], nil } - + // If we're at EOF, we have a final, non-terminated line if atEOF { if len(data) > maxLineLen { @@ -136,14 +136,14 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in } return len(data), data, nil } - + // If the line is too long, split it if len(data) >= maxLineLen { // Return a chunk up to MaxLineLength // In turbo mode, we handle long lines silently return maxLineLen, data[0:maxLineLen], nil } - + // Request more data return 0, nil, nil } @@ -153,9 +153,9 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, if atEOF && len(data) == 0 { return 0, nil, nil } - + maxLineLen := config.Server.MaxLineLength - + // Look for a newline if i := bytes.IndexByte(data, '\n'); i >= 0 { // Check if the line before the newline exceeds max length @@ -174,7 +174,7 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, f.warnedAboutLongLine = false // Reset warning for next long line sequence return i + 1, data[0:i], nil } - + // If we're at EOF, we have a final, non-terminated line if atEOF { if len(data) > maxLineLen { @@ -190,7 +190,7 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, } return len(data), data, nil } - + // If the line is too long, split it if len(data) >= maxLineLen { // Warn about long line (only once) @@ -199,11 +199,11 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, "Long log line, splitting into multiple lines") + "\n" f.warnedAboutLongLine = true } - + // Return a chunk up to MaxLineLength return maxLineLen, data[0:maxLineLen], nil } - + // Request more data return 0, nil, nil } @@ -213,10 +213,17 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext.LContext, processor line.Processor, re regex.Regex) error { - reader, fd, err := f.makeReader() + reader, fd, decompressor, err := f.makeReader() if fd != nil { defer fd.Close() } + if decompressor != nil { + defer func() { + if closeErr := decompressor.Close(); closeErr != nil { + dlog.Common.Warn(f.filePath, "Unable to close compressed reader", closeErr) + } + }() + } if err != nil { return err } @@ -224,7 +231,7 @@ func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext // Create a cancelable context for the truncate check goroutine truncateCtx, cancelTruncate := context.WithCancel(ctx) defer cancelTruncate() - + truncate := make(chan struct{}) go f.periodicTruncateCheck(truncateCtx, truncate) @@ -236,7 +243,7 @@ func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext // For cat/grep mode, just read once err = f.readWithProcessorOptimized(ctx, fd, reader, truncate, ltx, processor, re) - + // Ensure any buffered data is flushed if flushErr := processor.Flush(); flushErr != nil && err == nil { err = flushErr @@ -265,45 +272,45 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, // Get a buffer from the pool for reading bufPtr := pool.GetMediumBuffer() defer pool.PutMediumBuffer(bufPtr) - + for { // Read available data using pooled buffer buf := (*bufPtr)[:cap(*bufPtr)] // Reset to full capacity n, err := reader.Read(buf) - + if n > 0 { // Process the data we read data := buf[:n] - + // Process complete lines for len(data) > 0 { // Find newline idx := bytes.IndexByte(data, '\n') - + if idx >= 0 { // Complete line found partialLine.Write(data[:idx]) - + // Process the line if it's not empty if partialLine.Len() > 0 { f.updatePosition() lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) lineBuf.Write(partialLine.Bytes()) - + if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil { return err } } - + partialLine.Reset() data = data[idx+1:] - + // Reset long line warning f.warnedAboutLongLine = false } else { // No newline, add to partial line partialLine.Write(data) - + // Check if line is too long if partialLine.Len() >= config.Server.MaxLineLength { if !f.warnedAboutLongLine { @@ -311,35 +318,35 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, "Long log line, splitting into multiple lines") + "\n" f.warnedAboutLongLine = true } - + // Process the partial line f.updatePosition() lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) lineBuf.Write(partialLine.Bytes()) - + if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil { return err } - + partialLine.Reset() } - + break } } - + // Flush processor periodically if err := processor.Flush(); err != nil { return err } } - + // Handle read errors if err != nil { if err != io.EOF { return err } - + // EOF handling select { case <-ctx.Done(): @@ -352,7 +359,7 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, // Continue reading after a short delay } } - + // Check for cancellation select { case <-ctx.Done(): |
