diff options
| author | Paul Buetow <paul@buetow.org> | 2025-06-18 09:16:34 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-06-18 09:16:34 +0300 |
| commit | bc62af9dc52dbfb0eb8569a78849ae4d0dd4fc57 (patch) | |
| tree | 3cc797b07544e15cd8e0ccf3a1010cea3d8a8100 | |
| parent | 67a6b9d8e8e8dc83d5ea3e5859e631a0dfa9dabe (diff) | |
Remove old channel-based implementation files
- Delete obsolete readfile.go, readfilelcontext.go, tailfile.go, catfile.go
- Clean up deprecated comments in readcommand.go
- Add *.query to .gitignore for temporary test files
- DTail now operates purely in channelless mode
- All tests passing after cleanup
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | internal/io/fs/catfile.go | 20 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 345 | ||||
| -rw-r--r-- | internal/io/fs/readfilelcontext.go | 214 | ||||
| -rw-r--r-- | internal/io/fs/tailfile.go | 20 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 3 |
6 files changed, 2 insertions, 601 deletions
@@ -1,6 +1,7 @@ *_proprietary.go *.csv *.tmp +*.query *.out cache/ log/ diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go deleted file mode 100644 index e4676f3..0000000 --- a/internal/io/fs/catfile.go +++ /dev/null @@ -1,20 +0,0 @@ -package fs - -// CatFile is for reading a whole file. -type CatFile struct { - readFile -} - -// NewCatFile returns a new file catter. -func NewCatFile(filePath string, globID string, serverMessages chan<- string) CatFile { - return CatFile{ - readFile: readFile{ - filePath: filePath, - globID: globID, - serverMessages: serverMessages, - retry: false, - canSkipLines: false, - seekEOF: false, - }, - } -} diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go deleted file mode 100644 index 3dafd9b..0000000 --- a/internal/io/fs/readfile.go +++ /dev/null @@ -1,345 +0,0 @@ -package fs - -import ( - "bufio" - "bytes" - "compress/gzip" - "context" - "errors" - "fmt" - "io" - "os" - "strings" - "sync" - "time" - - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/dlog" - "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/pool" - "github.com/mimecast/dtail/internal/lcontext" - "github.com/mimecast/dtail/internal/regex" - - "github.com/DataDog/zstd" -) - -type readStatus int - -const ( - nothing readStatus = iota - abortReading readStatus = iota - continueReading readStatus = iota -) - -// Used to tail and filter a local log file. -type readFile struct { - // Various statistics (e.g. regex hit percentage, transfer percentage). - stats - // Path of log file to tail. - filePath string - // The glob identifier of the file. - globID string - // Channel to send a server message to the dtail client - serverMessages chan<- string - // Periodically retry reading file. - retry bool - // Can I skip messages when there are too many? - canSkipLines bool - // Seek to the EOF before processing file? - seekEOF bool - // Warned already about a long line. - warnedAboutLongLine bool -} - -// String returns the string representation of the readFile -func (f readFile) String() string { - return fmt.Sprintf( - "readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", - f.filePath, - f.globID, - f.retry, - f.canSkipLines, - f.seekEOF) -} - -// FilePath returns the full file path. -func (f readFile) FilePath() string { - return f.filePath -} - -// Retry reading the file on error? -func (f readFile) Retry() bool { - return f.retry -} - -// Start tailing a log file. -func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, - lines chan<- *line.Line, re regex.Regex) error { - - reader, fd, err := f.makeReader() - if fd != nil { - defer fd.Close() - } - if err != nil { - return err - } - - rawLines := make(chan *bytes.Buffer, 100) - truncate := make(chan struct{}) - - readCtx, readCancel := context.WithCancel(ctx) - var filterWg sync.WaitGroup - filterWg.Add(1) - - go f.periodicTruncateCheck(ctx, truncate) - go func() { - f.filter(ctx, ltx, rawLines, lines, re) - filterWg.Done() - // If the filter stopped, make the reader stop too, no need to read - // more data if there is nothing more the filter wants to filter for! - // E.g. it could be that we only want to filter N matches but not more. - readCancel() - }() - - err = f.read(readCtx, fd, reader, rawLines, truncate) - close(rawLines) - // Filter may sends some data still. So wait until it is done here. - filterWg.Wait() - - return err -} - -func (f *readFile) makeReader() (*bufio.Reader, *os.File, error) { - if f.filePath == "" && f.globID == "-" { - return f.makePipeReader() - } - return f.makeFileReader() -} - -func (f *readFile) makeFileReader() (*bufio.Reader, *os.File, error) { - var reader *bufio.Reader - fd, err := os.Open(f.filePath) - if err != nil { - return reader, fd, err - } - - if f.seekEOF { - fd.Seek(0, io.SeekEnd) - } - - reader, err = f.makeCompressedFileReader(fd) - if err != nil { - return reader, fd, err - } - - return reader, fd, nil -} - -func (f *readFile) makePipeReader() (*bufio.Reader, *os.File, error) { - return bufio.NewReader(os.Stdin), nil, nil -} - -func (f *readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) { - for { - select { - case <-time.After(time.Second * 3): - select { - case truncate <- struct{}{}: - case <-ctx.Done(): - } - case <-ctx.Done(): - return - } - } -} - -func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, err error) { - switch { - case strings.HasSuffix(f.FilePath(), ".gz"): - fallthrough - case strings.HasSuffix(f.FilePath(), ".gzip"): - dlog.Common.Info(f.FilePath(), "Detected gzip compression format") - var gzipReader *gzip.Reader - gzipReader, err = gzip.NewReader(fd) - if err != nil { - return - } - reader = bufio.NewReader(gzipReader) - case strings.HasSuffix(f.FilePath(), ".zst"): - dlog.Common.Info(f.FilePath(), "Detected zstd compression format") - reader = bufio.NewReader(zstd.NewReader(fd)) - default: - reader = bufio.NewReader(fd) - } - return -} - -func (f *readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader, - rawLines chan *bytes.Buffer, truncate <-chan struct{}) error { - - // Use chunked reader for better performance - chunkedReader := NewChunkedReader(reader, 64*1024) // 64KB chunks - - // Create a goroutine to handle truncate signals - go func() { - select { - case <-truncate: - // Handle truncation by attempting to seek to end of file - if fd != nil { - fd.Seek(0, io.SeekEnd) - } - case <-ctx.Done(): - return - } - }() - - // Process lines using chunked reader - for { - err := chunkedReader.ProcessLines(ctx, rawLines, config.Server.MaxLineLength, - f.filePath, f.serverMessages, f.seekEOF) - if err != nil { - if err == io.EOF || err == context.Canceled { - return nil - } - // Handle read errors similar to original implementation - time.Sleep(time.Millisecond * 100) - continue - } - return nil - } -} - -// Filter log lines matching a given regular expression. -func (f *readFile) filter(ctx context.Context, ltx lcontext.LContext, - rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re regex.Regex) { - - // Do we have any kind of local context settings? If so then run the more complex - // filterWithLContext method. - if ltx.Has() { - // We can not skip transmitting any lines to the client with a local - // grep context specified. - f.canSkipLines = false - f.filterWithLContext(ctx, ltx, rawLines, lines, re) - return - } - - f.filterWithoutLContext(ctx, rawLines, lines, re) -} - -func (f *readFile) transmittable(rawLine *bytes.Buffer, length, capacity int, - re regex.Regex) (*line.Line, bool) { - - newLine := line.Null() - if !re.Match(rawLine.Bytes()) { - f.updateLineNotMatched() - f.updateLineNotTransmitted() - return newLine, false - } - f.updateLineMatched() - - // Can we actually send more messages, channel capacity reached? - if f.canSkipLines && length >= capacity { - f.updateLineNotTransmitted() - return newLine, false - } - f.updateLineTransmitted() - - return line.New(rawLine, f.totalLineCount(), f.transmittedPerc(), f.globID), true -} - -// Check wether log file is truncated. Returns nil if not. -func (f *readFile) truncated(fd *os.File) (bool, error) { - if fd == nil { - return false, nil - } - - dlog.Common.Debug(f.filePath, "File truncation check") - - // Can not seek currently open FD. - currentPosition, err := fd.Seek(0, os.SEEK_CUR) - if err != nil { - return true, err - } - // Can not open file at original path. - pathFd, err := os.Open(f.filePath) - if err != nil { - return true, err - } - defer pathFd.Close() - - // Can not seek file at original path. - pathPosition, err := pathFd.Seek(0, io.SeekEnd) - if err != nil { - return true, err - } - if currentPosition > pathPosition { - return true, errors.New("File got truncated") - } - return false, nil -} - -// Deal with the scenario that nothing could be read from the fd. -func (f *readFile) handleReadError(ctx context.Context, err error, fd *os.File, - rawLines chan *bytes.Buffer, truncate <-chan struct{}, - message *bytes.Buffer) (readStatus, error) { - - if err != io.EOF { - return abortReading, err - } - - select { - case <-truncate: - if isTruncated, err := f.truncated(fd); isTruncated { - return abortReading, err - } - case <-ctx.Done(): - return abortReading, nil - default: - } - - if !f.seekEOF { - dlog.Common.Info(f.FilePath(), "End of file reached") - if len(message.Bytes()) > 0 { - select { - case rawLines <- message: - case <-ctx.Done(): - } - } - return abortReading, nil - } - - return nothing, nil -} - -// Now process the byte we just read from the fd. -func (f *readFile) handleReadByte(ctx context.Context, b byte, - rawLines chan *bytes.Buffer, message *bytes.Buffer) (readStatus, *bytes.Buffer) { - - switch b { - case '\n': - select { - case rawLines <- message: - message = pool.BytesBuffer.Get().(*bytes.Buffer) - f.warnedAboutLongLine = false - case <-ctx.Done(): - return abortReading, message - } - default: - if message.Len() >= config.Server.MaxLineLength { - if !f.warnedAboutLongLine { - f.serverMessages <- dlog.Common.Warn(f.filePath, - "Long log line, splitting into multiple lines") + "\n" - f.warnedAboutLongLine = true - } - message.WriteByte('\n') - select { - case rawLines <- message: - message = pool.BytesBuffer.Get().(*bytes.Buffer) - case <-ctx.Done(): - return abortReading, message - } - } - } - - return nothing, message -} diff --git a/internal/io/fs/readfilelcontext.go b/internal/io/fs/readfilelcontext.go deleted file mode 100644 index 44ce17d..0000000 --- a/internal/io/fs/readfilelcontext.go +++ /dev/null @@ -1,214 +0,0 @@ -package fs - -import ( - "bytes" - "context" - - "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/pool" - "github.com/mimecast/dtail/internal/lcontext" - "github.com/mimecast/dtail/internal/regex" -) - -// The local context state. -type ltxState struct { - // Max state - maxCount int - processMaxCount bool - maxReached bool - - // Before state - before int - processBefore bool - beforeBuf chan *bytes.Buffer - - // After state - after int - processAfter bool -} - -// We don't have any local grep context, which makes life much simpler and more efficient. -func (f *readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer, - lines chan<- *line.Line, re regex.Regex) { - - for { - select { - case rawLine, ok := <-rawLines: - f.updatePosition() - if !ok { - return - } - if newLine, ok := f.transmittable(rawLine, len(lines), cap(lines), re); ok { - select { - case lines <- newLine: - case <-ctx.Done(): - return - } - } - } - } -} - -// Filter log lines matching a given regular expression, however with local grep context. -func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext, - rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re regex.Regex) { - - var ls ltxState - - // The following 3 scenarios may also be used at once/any combination together. - - // Scenario 1: Finish once maxCount hits found - ls.maxCount = ltx.MaxCount - ls.processMaxCount = ls.maxCount > 0 - ls.maxReached = false - - // Scenario 2: Print prev. N lines when current line matches. - ls.before = ltx.BeforeContext - ls.processBefore = ls.before > 0 - if ls.processBefore { - ls.beforeBuf = make(chan *bytes.Buffer, ls.before) - } - - // Screnario 3: Print next N lines when current line matches. - ls.after = 0 - ls.processAfter = ltx.AfterContext > 0 - - // No go through all raw lines read to determine with they satisfy the local - // context or not. "Matching" lines will be sent to the lines channel. - for rawLine := range rawLines { - status := f.filterLineWithLContext(ctx, <x, &ls, rawLines, lines, &re, rawLine) - switch status { - case abortReading: - return - default: - } - } -} - -// Filter log lines matching a given regular expression, however with local grep context. -func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LContext, - ls *ltxState, rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re *regex.Regex, - rawLine *bytes.Buffer) readStatus { - - f.updatePosition() - - if !re.Match(rawLine.Bytes()) { - f.updateLineNotMatched() - status := f.lContextNotMatched(ctx, ls, lines, rawLine) - switch status { - case nothing: - default: - return status - } - } - - f.updateLineMatched() - - // If we have an "after" context to worry about... - if ls.processAfter { - if ls.maxReached { - // We have reached the "max" hits. Stop/abort reading. - return abortReading - } - // Reset the "after" context. - ls.after = ltx.AfterContext - } - - // If we have a "before" context to worry about... - if ls.processBefore { - status := f.lContextProcessBefore(ctx, ls, lines, rawLine) - switch status { - case nothing: - default: - return status - } - } - - line := line.New(rawLine, f.totalLineCount(), 100, f.globID) - - select { - case lines <- line: - // If we have a "max" context to worry about... - if ls.processMaxCount { - status := f.lContextProcessMaxCount(ctx, ls) - switch status { - case nothing: - default: - return status - } - } - case <-ctx.Done(): - return abortReading - } - - return nothing -} - -// Do some post-processing for the "after" and the "before" contexts in case the -// line didn't match the regex. -func (f *readFile) lContextNotMatched(ctx context.Context, ls *ltxState, - lines chan<- *line.Line, rawLine *bytes.Buffer) readStatus { - - if ls.processAfter && ls.after > 0 { - ls.after-- - myLine := line.New(rawLine, f.totalLineCount(), 100, f.globID) - - select { - case lines <- myLine: - case <-ctx.Done(): - return abortReading - } - - } else if ls.processBefore { - // Keep last num BeforeContext raw messages. - select { - case ls.beforeBuf <- rawLine: - default: - pool.RecycleBytesBuffer(<-ls.beforeBuf) - ls.beforeBuf <- rawLine - } - } - - return continueReading -} - -// Do some processing for the "before" context. -func (f *readFile) lContextProcessBefore(ctx context.Context, - ls *ltxState, lines chan<- *line.Line, rawLine *bytes.Buffer) readStatus { - - i := uint64(len(ls.beforeBuf)) - for { - select { - case rawLine := <-ls.beforeBuf: - myLine := line.New(rawLine, f.totalLineCount()-i, 100, f.globID) - i-- - - select { - case lines <- myLine: - case <-ctx.Done(): - return abortReading - } - default: - // beforeBuf is now empty. - } - if len(ls.beforeBuf) == 0 { - break - } - } - - return nothing -} - -// Do some processing for the "max" context. -func (f *readFile) lContextProcessMaxCount(ctx context.Context, ls *ltxState) readStatus { - ls.maxCount-- - if ls.maxCount == 0 { - if !ls.processAfter || ls.after == 0 { - return abortReading - } - // Unfortunatley we have to continue filter, as there might be more lines to print - ls.maxReached = true - } - - return nothing -} diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go deleted file mode 100644 index 7a40ac4..0000000 --- a/internal/io/fs/tailfile.go +++ /dev/null @@ -1,20 +0,0 @@ -package fs - -// TailFile is to tail and filter a log file. -type TailFile struct { - readFile -} - -// NewTailFile returns a new file tailer. -func NewTailFile(filePath string, globID string, serverMessages chan<- string) TailFile { - return TailFile{ - readFile: readFile{ - filePath: filePath, - globID: globID, - serverMessages: serverMessages, - retry: true, - canSkipLines: true, - seekEOF: true, - }, - } -} diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 59e9d1b..967cae4 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -51,8 +51,7 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, return } - // Always use channelless mode now - channel-based code is deprecated - dlog.Server.Debug("Using channelless processing mode for mode:", r.mode) + dlog.Server.Debug("Processing mode:", r.mode) r.startChannelless(ctx, ltx, args, re, retries, queryStr) } |
