summaryrefslogtreecommitdiff
path: root/internal/io/fs/readfile_processor_optimized.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/io/fs/readfile_processor_optimized.go')
-rw-r--r--internal/io/fs/readfile_processor_optimized.go97
1 files changed, 52 insertions, 45 deletions
diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go
index 96d5a16..3e460ed 100644
--- a/internal/io/fs/readfile_processor_optimized.go
+++ b/internal/io/fs/readfile_processor_optimized.go
@@ -32,23 +32,23 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File,
// Use a scanner for efficient line reading
scanner := bufio.NewScanner(reader)
-
+
// Get a buffer from the pool instead of allocating a new one
bufPtr := pool.GetScannerBuffer()
buf := *bufPtr
- maxTokenSize := 1024 * 1024 // 1MB max token size
+ maxTokenSize := 1024 * 1024 // 1MB max token size
scanner.Buffer(buf, maxTokenSize)
-
+
// Ensure we return the buffer to the pool when done
defer pool.PutScannerBuffer(bufPtr)
-
+
// Use custom split function that preserves line endings
scanner.Split(f.scanLinesPreserveEndings)
-
+
// Track truncation checks
lastTruncateCheck := time.Now()
truncateCheckInterval := 3 * time.Second
-
+
lineCount := 0
for scanner.Scan() {
// Check context cancellation
@@ -57,7 +57,7 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File,
return nil
default:
}
-
+
// Check for file truncation periodically
if time.Since(lastTruncateCheck) > truncateCheckInterval {
select {
@@ -69,23 +69,23 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File,
}
lastTruncateCheck = time.Now()
}
-
+
// Get the line data
lineData := scanner.Bytes()
-
+
// Get a buffer from the pool and copy the data
lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer)
lineBuf.Write(lineData)
-
+
// Process the line
f.updatePosition()
if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil {
return err
}
-
+
lineCount++
}
-
+
// Check for scanner errors
if err := scanner.Err(); err != nil {
// Handle EOF specially for tailing
@@ -95,7 +95,7 @@ func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File,
}
return err
}
-
+
return nil
}
@@ -105,9 +105,9 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in
if atEOF && len(data) == 0 {
return 0, nil, nil
}
-
+
maxLineLen := config.Server.MaxLineLength
-
+
// Look for a newline
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// Check if the line before the newline exceeds max length
@@ -116,7 +116,7 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in
// In turbo mode, we handle long lines silently
return maxLineLen, data[0:maxLineLen], nil
}
-
+
// Line is within limit, include the line ending in the token
// Check if there's a \r before the \n
if i > 0 && data[i-1] == '\r' {
@@ -126,7 +126,7 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in
// Unix line ending (\n) - include it in token
return i + 1, data[0 : i+1], nil
}
-
+
// If we're at EOF, we have a final, non-terminated line
if atEOF {
if len(data) > maxLineLen {
@@ -136,14 +136,14 @@ func (f *readFile) scanLinesPreserveEndings(data []byte, atEOF bool) (advance in
}
return len(data), data, nil
}
-
+
// If the line is too long, split it
if len(data) >= maxLineLen {
// Return a chunk up to MaxLineLength
// In turbo mode, we handle long lines silently
return maxLineLen, data[0:maxLineLen], nil
}
-
+
// Request more data
return 0, nil, nil
}
@@ -153,9 +153,9 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
if atEOF && len(data) == 0 {
return 0, nil, nil
}
-
+
maxLineLen := config.Server.MaxLineLength
-
+
// Look for a newline
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// Check if the line before the newline exceeds max length
@@ -174,7 +174,7 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
f.warnedAboutLongLine = false // Reset warning for next long line sequence
return i + 1, data[0:i], nil
}
-
+
// If we're at EOF, we have a final, non-terminated line
if atEOF {
if len(data) > maxLineLen {
@@ -190,7 +190,7 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
}
return len(data), data, nil
}
-
+
// If the line is too long, split it
if len(data) >= maxLineLen {
// Warn about long line (only once)
@@ -199,11 +199,11 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
"Long log line, splitting into multiple lines") + "\n"
f.warnedAboutLongLine = true
}
-
+
// Return a chunk up to MaxLineLength
return maxLineLen, data[0:maxLineLen], nil
}
-
+
// Request more data
return 0, nil, nil
}
@@ -213,10 +213,17 @@ func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int,
func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext.LContext,
processor line.Processor, re regex.Regex) error {
- reader, fd, err := f.makeReader()
+ reader, fd, decompressor, err := f.makeReader()
if fd != nil {
defer fd.Close()
}
+ if decompressor != nil {
+ defer func() {
+ if closeErr := decompressor.Close(); closeErr != nil {
+ dlog.Common.Warn(f.filePath, "Unable to close compressed reader", closeErr)
+ }
+ }()
+ }
if err != nil {
return err
}
@@ -224,7 +231,7 @@ func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext
// Create a cancelable context for the truncate check goroutine
truncateCtx, cancelTruncate := context.WithCancel(ctx)
defer cancelTruncate()
-
+
truncate := make(chan struct{})
go f.periodicTruncateCheck(truncateCtx, truncate)
@@ -236,7 +243,7 @@ func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext
// For cat/grep mode, just read once
err = f.readWithProcessorOptimized(ctx, fd, reader, truncate, ltx, processor, re)
-
+
// Ensure any buffered data is flushed
if flushErr := processor.Flush(); flushErr != nil && err == nil {
err = flushErr
@@ -265,45 +272,45 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File,
// Get a buffer from the pool for reading
bufPtr := pool.GetMediumBuffer()
defer pool.PutMediumBuffer(bufPtr)
-
+
for {
// Read available data using pooled buffer
buf := (*bufPtr)[:cap(*bufPtr)] // Reset to full capacity
n, err := reader.Read(buf)
-
+
if n > 0 {
// Process the data we read
data := buf[:n]
-
+
// Process complete lines
for len(data) > 0 {
// Find newline
idx := bytes.IndexByte(data, '\n')
-
+
if idx >= 0 {
// Complete line found
partialLine.Write(data[:idx])
-
+
// Process the line if it's not empty
if partialLine.Len() > 0 {
f.updatePosition()
lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer)
lineBuf.Write(partialLine.Bytes())
-
+
if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil {
return err
}
}
-
+
partialLine.Reset()
data = data[idx+1:]
-
+
// Reset long line warning
f.warnedAboutLongLine = false
} else {
// No newline, add to partial line
partialLine.Write(data)
-
+
// Check if line is too long
if partialLine.Len() >= config.Server.MaxLineLength {
if !f.warnedAboutLongLine {
@@ -311,35 +318,35 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File,
"Long log line, splitting into multiple lines") + "\n"
f.warnedAboutLongLine = true
}
-
+
// Process the partial line
f.updatePosition()
lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer)
lineBuf.Write(partialLine.Bytes())
-
+
if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil {
return err
}
-
+
partialLine.Reset()
}
-
+
break
}
}
-
+
// Flush processor periodically
if err := processor.Flush(); err != nil {
return err
}
}
-
+
// Handle read errors
if err != nil {
if err != io.EOF {
return err
}
-
+
// EOF handling
select {
case <-ctx.Done():
@@ -352,7 +359,7 @@ func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File,
// Continue reading after a short delay
}
}
-
+
// Check for cancellation
select {
case <-ctx.Done():