summaryrefslogtreecommitdiff
path: root/internal/io
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-08 09:33:22 +0200
committerPaul Buetow <paul@buetow.org>2026-03-08 09:33:22 +0200
commit7179dba1f70f7fbdc8b89bf709bc2d5b643fe692 (patch)
tree7340de538cfcc583102aa5697a65801501ec32c4 /internal/io
parent91b83a9ffcabf7264888cf84b95f08b8cc88c832 (diff)
task: close compressed readers in file read paths (task 377)
Diffstat (limited to 'internal/io')
-rw-r--r--internal/io/fs/readfile.go28
-rw-r--r--internal/io/fs/readfile_processor.go55
-rw-r--r--internal/io/fs/readfile_processor_optimized.go97
3 files changed, 102 insertions, 78 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index fab933b..ee486bc 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -76,10 +76,17 @@ func (f readFile) Retry() bool {
func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
lines chan<- *line.Line, re regex.Regex) error {
- reader, fd, err := f.makeReader()
+ reader, fd, decompressor, err := f.makeReader()
if fd != nil {
defer fd.Close()
}
+ if decompressor != nil {
+ defer func() {
+ if closeErr := decompressor.Close(); closeErr != nil {
+ dlog.Common.Warn(f.filePath, "Unable to close compressed reader", closeErr)
+ }
+ }()
+ }
if err != nil {
return err
}
@@ -109,14 +116,14 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
return err
}
-func (f *readFile) makeReader() (*bufio.Reader, *os.File, error) {
+func (f *readFile) makeReader() (*bufio.Reader, *os.File, io.Closer, error) {
if f.filePath == "" && f.globID == "-" {
return f.makePipeReader()
}
return f.makeFileReader()
}
-func (f *readFile) makeFileReader() (reader *bufio.Reader, fd *os.File, err error) {
+func (f *readFile) makeFileReader() (reader *bufio.Reader, fd *os.File, decompressor io.Closer, err error) {
if fd, err = os.Open(f.filePath); err != nil {
return
}
@@ -127,18 +134,18 @@ func (f *readFile) makeFileReader() (reader *bufio.Reader, fd *os.File, err erro
}
}
- reader, err = f.makeCompressedFileReader(fd)
+ reader, decompressor, err = f.makeCompressedFileReader(fd)
return
}
-func (f *readFile) makePipeReader() (*bufio.Reader, *os.File, error) {
- return bufio.NewReader(os.Stdin), nil, nil
+func (f *readFile) makePipeReader() (*bufio.Reader, *os.File, io.Closer, error) {
+ return bufio.NewReader(os.Stdin), nil, nil, nil
}
func (f *readFile) periodicTruncateCheck(ctx context.Context, truncate chan<- struct{}) {
ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()
-
+
for {
select {
case <-ticker.C:
@@ -153,7 +160,7 @@ func (f *readFile) periodicTruncateCheck(ctx context.Context, truncate chan<- st
}
}
-func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, err error) {
+func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, decompressor io.Closer, err error) {
switch {
case strings.HasSuffix(f.FilePath(), ".gz"):
fallthrough
@@ -164,10 +171,13 @@ func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader,
if err != nil {
return
}
+ decompressor = gzipReader
reader = bufio.NewReader(gzipReader)
case strings.HasSuffix(f.FilePath(), ".zst"):
dlog.Common.Info(f.FilePath(), "Detected zstd compression format")
- reader = bufio.NewReader(zstd.NewReader(fd))
+ zstdReader := zstd.NewReader(fd)
+ decompressor = zstdReader
+ reader = bufio.NewReader(zstdReader)
default:
reader = bufio.NewReader(fd)
}
diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go
index 4e636a4..94a28dc 100644
--- a/internal/io/fs/readfile_processor.go
+++ b/internal/io/fs/readfile_processor.go
@@ -21,10 +21,17 @@ import (
func (f *readFile) StartWithProcessor(ctx context.Context, ltx lcontext.LContext,
processor line.Processor, re regex.Regex) error {
- reader, fd, err := f.makeReader()
+ reader, fd, decompressor, err := f.makeReader()
if fd != nil {
defer fd.Close()
}
+ if decompressor != nil {
+ defer func() {
+ if closeErr := decompressor.Close(); closeErr != nil {
+ dlog.Common.Warn(f.filePath, "Unable to close compressed reader", closeErr)
+ }
+ }()
+ }
if err != nil {
return err
}
@@ -38,7 +45,7 @@ func (f *readFile) StartWithProcessor(ctx context.Context, ltx lcontext.LContext
// Process file with direct callbacks instead of channels
err = f.readWithProcessor(ctx, fd, reader, truncate, ltx, processor, re)
-
+
// Ensure any buffered data is flushed
if flushErr := processor.Flush(); flushErr != nil && err == nil {
err = flushErr
@@ -100,7 +107,7 @@ func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte,
if err := processor.ProcessFilteredLine(message); err != nil {
return abortReading
}
-
+
f.warnedAboutLongLine = false
return continueReading
@@ -113,7 +120,7 @@ func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte,
}
// Force a line break
message.WriteByte('\n')
-
+
// Process the line
f.updatePosition()
if err := processor.ProcessFilteredLine(message); err != nil {
@@ -164,19 +171,19 @@ type filteringProcessor struct {
ltx lcontext.LContext
stats *stats
globID string
-
+
// For local context handling
- beforeBuf []*bytes.Buffer
- afterCount int
- maxCount int
- maxReached bool
+ beforeBuf []*bytes.Buffer
+ afterCount int
+ maxCount int
+ maxReached bool
}
// ProcessFilteredLine applies regex filtering before passing to the underlying processor
func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error {
// Update stats
lineNum := fp.stats.totalLineCount()
-
+
// Simple case: no local context
if !fp.ltx.Has() {
if !fp.re.Match(rawLine.Bytes()) {
@@ -185,10 +192,10 @@ func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error {
pool.RecycleBytesBuffer(rawLine)
return nil
}
-
+
fp.stats.updateLineMatched()
fp.stats.updateLineTransmitted()
-
+
// Process the line
err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID)
if err != nil {
@@ -196,7 +203,7 @@ func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error {
}
return err
}
-
+
// Complex case: handle local context (before/after/max)
return fp.processWithContext(rawLine, lineNum)
}
@@ -204,10 +211,10 @@ func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error {
// processWithContext handles lines when local context is enabled
func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum uint64) error {
matched := fp.re.Match(rawLine.Bytes())
-
+
if !matched {
fp.stats.updateLineNotMatched()
-
+
// Handle after context
if fp.ltx.AfterContext > 0 && fp.afterCount > 0 {
fp.afterCount--
@@ -218,7 +225,7 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum
}
return err
}
-
+
// Handle before context buffer
if fp.ltx.BeforeContext > 0 {
// Add to before buffer
@@ -231,20 +238,20 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum
} else {
pool.RecycleBytesBuffer(rawLine)
}
-
+
fp.stats.updateLineNotTransmitted()
return nil
}
-
+
// Line matched
fp.stats.updateLineMatched()
-
+
// Check if we've reached max count
if fp.maxReached {
pool.RecycleBytesBuffer(rawLine)
return io.EOF // Stop processing
}
-
+
// Process before context
if fp.ltx.BeforeContext > 0 && len(fp.beforeBuf) > 0 {
for i, buf := range fp.beforeBuf {
@@ -260,14 +267,14 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum
}
fp.beforeBuf = fp.beforeBuf[:0] // Clear the buffer
}
-
+
// Process the matched line
fp.stats.updateLineTransmitted()
if err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID); err != nil {
pool.RecycleBytesBuffer(rawLine)
return err
}
-
+
// Update max count
if fp.ltx.MaxCount > 0 {
fp.maxCount++
@@ -278,11 +285,11 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum
fp.maxReached = true
}
}
-
+
// Reset after context
if fp.ltx.AfterContext > 0 {
fp.afterCount = fp.ltx.AfterContext
}
-
+
return nil
}
diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go
index 96d5a16..3e460ed 100644
--- a/internal/io/fs/readfile_processor_optimized.go
+++ b/internal/io/fs/readfile_processor_optimized.go
@@ -32,23 +32,23 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File,
// Use a scanner for efficient line reading
scanner := bufio.NewScanner(reader)
-
+
// Get a buffer from the pool instead of allocating a new one
bufPtr := pool.GetScannerBuffer()
buf := *bufPtr
- maxTokenSize := 1024 * 1024 // 1MB max token size
+ maxTokenSize := 1024 * 1024 // 1MB max token size
scanner.Buffer(buf, maxTokenSize)
-
+
// Ensure we return the buffer to the pool when done
defer pool.PutScannerBuffer(bufPtr)
-
+
// Use custom split function that preserves line endings
scanner.Split(f.scanLinesPreserveEndings)
-
+
// Track truncation checks
lastTruncateCheck := time.Now()
truncateCheckInterval := 3 * time.Second
-
+
lineCount := 0
for scanner.Scan() {
// Check context cancellation
@@ -57,7 +57,7 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File,
return nil
default:
}
-
+
// Check for file truncation periodically
if time.Since(lastTruncateCheck) > truncateCheckInterval {
select {
@@ -69,23 +69,23 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File,
}
lastTruncateCheck = time.Now()
}
-
+
// Get the line data
lineData := scanner.Bytes()
-
+
// Get a buffer from the pool and copy the data
lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer)
lineBuf.Write(lineData)
-
+
// Process the line
f.updatePosition()
if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil {
return err
}
-
+
lineCount++
}
-
+
// Check for scanner errors
if err := scanner.Err(); err != nil {
// Handle EOF specially for tailing
@@ -95,7 +95,7 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File,
}
return err
}
-
+
return nil
}
@@ -105,9 +105,9 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in
if atEOF && len(data) == 0 {
return 0, nil, nil
}
-
+
maxLineLen := config.Server.MaxLineLength
-
+
// Look for a newline
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// Check if the line before the newline exceeds max length
@@ -116,7 +116,7 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in
// In turbo mode, we handle long lines silently
return maxLineLen, data[0:maxLineLen], nil
}
-
+
// Line is within limit, include the line ending in the token
// Check if there's a \r before the \n
if i > 0 && data[i-1] == '\r' {
@@ -126,7 +126,7 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in
// Unix line ending (\n) - include it in token
return i + 1, data[0 : i+1], nil
}
-
+
// If we're at EOF, we have a final, non-terminated line
if atEOF {
if len(data) > maxLineLen {
@@ -136,14 +136,14 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in
}
return len(data), data, nil
}
-
+
// If the line is too long, split it
if len(data) >= maxLineLen {
// Return a chunk up to MaxLineLength
// In turbo mode, we handle long lines silently
return maxLineLen, data[0:maxLineLen], nil
}
-
+
// Request more data
return 0, nil, nil
}
@@ -153,9 +153,9 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
if atEOF && len(data) == 0 {
return 0, nil, nil
}
-
+
maxLineLen := config.Server.MaxLineLength
-
+
// Look for a newline
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// Check if the line before the newline exceeds max length
@@ -174,7 +174,7 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
f.warnedAboutLongLine = false // Reset warning for next long line sequence
return i + 1, data[0:i], nil
}
-
+
// If we're at EOF, we have a final, non-terminated line
if atEOF {
if len(data) > maxLineLen {
@@ -190,7 +190,7 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
}
return len(data), data, nil
}
-
+
// If the line is too long, split it
if len(data) >= maxLineLen {
// Warn about long line (only once)
@@ -199,11 +199,11 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
"Long log line, splitting into multiple lines") + "\n"
f.warnedAboutLongLine = true
}
-
+
// Return a chunk up to MaxLineLength
return maxLineLen, data[0:maxLineLen], nil
}
-
+
// Request more data
return 0, nil, nil
}
@@ -213,10 +213,17 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext.LContext,
processor line.Processor, re regex.Regex) error {
- reader, fd, err := f.makeReader()
+ reader, fd, decompressor, err := f.makeReader()
if fd != nil {
defer fd.Close()
}
+ if decompressor != nil {
+ defer func() {
+ if closeErr := decompressor.Close(); closeErr != nil {
+ dlog.Common.Warn(f.filePath, "Unable to close compressed reader", closeErr)
+ }
+ }()
+ }
if err != nil {
return err
}
@@ -224,7 +231,7 @@ func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext
// Create a cancelable context for the truncate check goroutine
truncateCtx, cancelTruncate := context.WithCancel(ctx)
defer cancelTruncate()
-
+
truncate := make(chan struct{})
go f.periodicTruncateCheck(truncateCtx, truncate)
@@ -236,7 +243,7 @@ func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext
// For cat/grep mode, just read once
err = f.readWithProcessorOptimized(ctx, fd, reader, truncate, ltx, processor, re)
-
+
// Ensure any buffered data is flushed
if flushErr := processor.Flush(); flushErr != nil && err == nil {
err = flushErr
@@ -265,45 +272,45 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File,
// Get a buffer from the pool for reading
bufPtr := pool.GetMediumBuffer()
defer pool.PutMediumBuffer(bufPtr)
-
+
for {
// Read available data using pooled buffer
buf := (*bufPtr)[:cap(*bufPtr)] // Reset to full capacity
n, err := reader.Read(buf)
-
+
if n > 0 {
// Process the data we read
data := buf[:n]
-
+
// Process complete lines
for len(data) > 0 {
// Find newline
idx := bytes.IndexByte(data, '\n')
-
+
if idx >= 0 {
// Complete line found
partialLine.Write(data[:idx])
-
+
// Process the line if it's not empty
if partialLine.Len() > 0 {
f.updatePosition()
lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer)
lineBuf.Write(partialLine.Bytes())
-
+
if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil {
return err
}
}
-
+
partialLine.Reset()
data = data[idx+1:]
-
+
// Reset long line warning
f.warnedAboutLongLine = false
} else {
// No newline, add to partial line
partialLine.Write(data)
-
+
// Check if line is too long
if partialLine.Len() >= config.Server.MaxLineLength {
if !f.warnedAboutLongLine {
@@ -311,35 +318,35 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File,
"Long log line, splitting into multiple lines") + "\n"
f.warnedAboutLongLine = true
}
-
+
// Process the partial line
f.updatePosition()
lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer)
lineBuf.Write(partialLine.Bytes())
-
+
if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil {
return err
}
-
+
partialLine.Reset()
}
-
+
break
}
}
-
+
// Flush processor periodically
if err := processor.Flush(); err != nil {
return err
}
}
-
+
// Handle read errors
if err != nil {
if err != io.EOF {
return err
}
-
+
// EOF handling
select {
case <-ctx.Done():
@@ -352,7 +359,7 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File,
// Continue reading after a short delay
}
}
-
+
// Check for cancellation
select {
case <-ctx.Done():