diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-13 21:10:28 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-14 20:11:23 +0300 |
| commit | 7eb36937fdc700f56ea294c83fb9b8e11958a328 (patch) | |
| tree | b5ab02d2400767410b8dec5183aa9f83d092f0d4 /internal/io/fs | |
| parent | 1a3b56d4df31737cd2a3e4369a69db16cdb610d5 (diff) | |
Merging grep context from master
Diffstat (limited to 'internal/io/fs')
| -rw-r--r-- | internal/io/fs/filereader.go | 4 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 149 |
2 files changed, 144 insertions, 9 deletions
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index 7773142..b05fd39 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -4,13 +4,15 @@ import ( "context" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" ) // FileReader is the interface used on the dtail server to read/cat/grep/mapr... // a file. type FileReader interface { - Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error + Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line, + re regex.Regex) error FilePath() string Retry() bool } diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 92f85b6..88d467e 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -16,6 +16,7 @@ import ( "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" "github.com/DataDog/zstd" @@ -62,8 +63,8 @@ func (f readFile) Retry() bool { } // Start tailing a log file. -func (f readFile) Start(ctx context.Context, lines chan<- line.Line, - re regex.Regex) error { +func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, + lines chan<- line.Line, re regex.Regex) error { dlog.Common.Debug("readFile", f) defer func() { @@ -102,7 +103,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, wg.Add(1) go f.periodicTruncateCheck(ctx, truncate) - go f.filter(ctx, &wg, rawLines, lines, re) + go f.filter(ctx, ltx, &wg, rawLines, lines, re) err = f.read(ctx, fd, rawLines, truncate) close(rawLines) @@ -213,10 +214,27 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu } // Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, - rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { +func (f readFile) filter(ctx context.Context, ltx lcontext.LContext, + wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, + re regex.Regex) { defer wg.Done() + // Do we have any kind of local context settings? If so then run the more complex + // filterWithLContext method. + if ltx.Has() { + // We can not skip transmitting any lines to the client with a local + // grep context specified. + f.canSkipLines = false + f.filterWithLContext(ctx, ltx, rawLines, lines, re) + return + } + + f.filterWithoutLContext(ctx, rawLines, lines, re) +} + +func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer, + lines chan<- line.Line, re regex.Regex) { + for { select { case line, ok := <-rawLines: @@ -235,11 +253,126 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, } } -func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, +// Filter log lines matching a given regular expression, however with local grep context. +func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext, + rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + + // Scenario 1: Finish once maxCount hits found + maxCount := ltx.MaxCount + processMaxCount := maxCount > 0 + maxReached := false + + // Scenario 2: Print prev. N lines when current line matches. + before := ltx.BeforeContext + processBefore := before > 0 + var beforeBuf chan *bytes.Buffer + if processBefore { + beforeBuf = make(chan *bytes.Buffer, before) + } + + // Screnario 3: Print next N lines when current line matches. + after := 0 + processAfter := ltx.AfterContext > 0 + + for lineBytesBuffer := range rawLines { + f.updatePosition() + + if !re.Match(lineBytesBuffer.Bytes()) { + f.updateLineNotMatched() + + if processAfter && after > 0 { + after-- + myLine := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount(), + TransmittedPerc: 100, + } + + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + + } else if processBefore { + // Keep last num BeforeContext raw messages. + select { + case beforeBuf <- lineBytesBuffer: + default: + pool.RecycleBytesBuffer(<-beforeBuf) + beforeBuf <- lineBytesBuffer + } + } + continue + } + + f.updateLineMatched() + + if processAfter { + if maxReached { + return + } + after = ltx.AfterContext + } + + if processBefore { + i := uint64(len(beforeBuf)) + for { + select { + case lineBytesBuffer := <-beforeBuf: + myLine := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount() - i, + TransmittedPerc: 100, + } + i-- + + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + default: + // beforeBuf is now empty. + } + if len(beforeBuf) == 0 { + break + } + } + } + + line := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount(), + TransmittedPerc: 100, + } + + select { + case lines <- line: + if processMaxCount { + maxCount-- + if maxCount == 0 { + if !processAfter || after == 0 { + return + } + // Unfortunatley we have to continue filter, as there might be more lines to print + maxReached = true + } + } + case <-ctx.Done(): + return + } + } +} + +func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int, re regex.Regex) (line.Line, bool) { var read line.Line - if !re.Match(lineBytes.Bytes()) { + if !re.Match(lineBytesBuffer.Bytes()) { f.updateLineNotMatched() f.updateLineNotTransmitted() return read, false @@ -254,7 +387,7 @@ func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, f.updateLineTransmitted() read = line.Line{ - Content: lineBytes, + Content: lineBytesBuffer, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc(), |
