summaryrefslogtreecommitdiff
path: root/internal/io/fs
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-13 21:10:28 +0300
committerPaul Buetow <paul@buetow.org>2021-10-14 20:11:23 +0300
commit7eb36937fdc700f56ea294c83fb9b8e11958a328 (patch)
treeb5ab02d2400767410b8dec5183aa9f83d092f0d4 /internal/io/fs
parent1a3b56d4df31737cd2a3e4369a69db16cdb610d5 (diff)
Merging grep context from master
Diffstat (limited to 'internal/io/fs')
-rw-r--r--internal/io/fs/filereader.go4
-rw-r--r--internal/io/fs/readfile.go149
2 files changed, 144 insertions, 9 deletions
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go
index 7773142..b05fd39 100644
--- a/internal/io/fs/filereader.go
+++ b/internal/io/fs/filereader.go
@@ -4,13 +4,15 @@ import (
"context"
"github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
)
// FileReader is the interface used on the dtail server to read/cat/grep/mapr...
// a file.
type FileReader interface {
- Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error
+ Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line,
+ re regex.Regex) error
FilePath() string
Retry() bool
}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 92f85b6..88d467e 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -16,6 +16,7 @@ import (
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
"github.com/DataDog/zstd"
@@ -62,8 +63,8 @@ func (f readFile) Retry() bool {
}
// Start tailing a log file.
-func (f readFile) Start(ctx context.Context, lines chan<- line.Line,
- re regex.Regex) error {
+func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
+ lines chan<- line.Line, re regex.Regex) error {
dlog.Common.Debug("readFile", f)
defer func() {
@@ -102,7 +103,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line,
wg.Add(1)
go f.periodicTruncateCheck(ctx, truncate)
- go f.filter(ctx, &wg, rawLines, lines, re)
+ go f.filter(ctx, ltx, &wg, rawLines, lines, re)
err = f.read(ctx, fd, rawLines, truncate)
close(rawLines)
@@ -213,10 +214,27 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu
}
// Filter log lines matching a given regular expression.
-func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup,
- rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
+func (f readFile) filter(ctx context.Context, ltx lcontext.LContext,
+ wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line,
+ re regex.Regex) {
defer wg.Done()
+ // Do we have any kind of local context settings? If so then run the more complex
+ // filterWithLContext method.
+ if ltx.Has() {
+ // We can not skip transmitting any lines to the client with a local
+ // grep context specified.
+ f.canSkipLines = false
+ f.filterWithLContext(ctx, ltx, rawLines, lines, re)
+ return
+ }
+
+ f.filterWithoutLContext(ctx, rawLines, lines, re)
+}
+
+func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer,
+ lines chan<- line.Line, re regex.Regex) {
+
for {
select {
case line, ok := <-rawLines:
@@ -235,11 +253,126 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup,
}
}
-func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int,
+// Filter log lines matching a given regular expression, however with local grep context.
+func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext,
+ rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
+
+ // Scenario 1: Finish once maxCount hits found
+ maxCount := ltx.MaxCount
+ processMaxCount := maxCount > 0
+ maxReached := false
+
+ // Scenario 2: Print prev. N lines when current line matches.
+ before := ltx.BeforeContext
+ processBefore := before > 0
+ var beforeBuf chan *bytes.Buffer
+ if processBefore {
+ beforeBuf = make(chan *bytes.Buffer, before)
+ }
+
+ // Screnario 3: Print next N lines when current line matches.
+ after := 0
+ processAfter := ltx.AfterContext > 0
+
+ for lineBytesBuffer := range rawLines {
+ f.updatePosition()
+
+ if !re.Match(lineBytesBuffer.Bytes()) {
+ f.updateLineNotMatched()
+
+ if processAfter && after > 0 {
+ after--
+ myLine := line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount(),
+ TransmittedPerc: 100,
+ }
+
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+
+ } else if processBefore {
+ // Keep last num BeforeContext raw messages.
+ select {
+ case beforeBuf <- lineBytesBuffer:
+ default:
+ pool.RecycleBytesBuffer(<-beforeBuf)
+ beforeBuf <- lineBytesBuffer
+ }
+ }
+ continue
+ }
+
+ f.updateLineMatched()
+
+ if processAfter {
+ if maxReached {
+ return
+ }
+ after = ltx.AfterContext
+ }
+
+ if processBefore {
+ i := uint64(len(beforeBuf))
+ for {
+ select {
+ case lineBytesBuffer := <-beforeBuf:
+ myLine := line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount() - i,
+ TransmittedPerc: 100,
+ }
+ i--
+
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+ default:
+ // beforeBuf is now empty.
+ }
+ if len(beforeBuf) == 0 {
+ break
+ }
+ }
+ }
+
+ line := line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount(),
+ TransmittedPerc: 100,
+ }
+
+ select {
+ case lines <- line:
+ if processMaxCount {
+ maxCount--
+ if maxCount == 0 {
+ if !processAfter || after == 0 {
+ return
+ }
+ // Unfortunatley we have to continue filter, as there might be more lines to print
+ maxReached = true
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int,
re regex.Regex) (line.Line, bool) {
var read line.Line
- if !re.Match(lineBytes.Bytes()) {
+ if !re.Match(lineBytesBuffer.Bytes()) {
f.updateLineNotMatched()
f.updateLineNotTransmitted()
return read, false
@@ -254,7 +387,7 @@ func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int,
f.updateLineTransmitted()
read = line.Line{
- Content: lineBytes,
+ Content: lineBytesBuffer,
SourceID: f.globID,
Count: f.totalLineCount(),
TransmittedPerc: f.transmittedPerc(),