summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-18 09:16:34 +0300
committerPaul Buetow <paul@buetow.org>2025-06-18 09:16:34 +0300
commitbc62af9dc52dbfb0eb8569a78849ae4d0dd4fc57 (patch)
tree3cc797b07544e15cd8e0ccf3a1010cea3d8a8100
parent67a6b9d8e8e8dc83d5ea3e5859e631a0dfa9dabe (diff)
Remove old channel-based implementation files
- Delete obsolete readfile.go, readfilelcontext.go, tailfile.go, catfile.go - Clean up deprecated comments in readcommand.go - Add *.query to .gitignore for temporary test files - DTail now operates purely in channelless mode - All tests passing after cleanup 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
-rw-r--r--.gitignore1
-rw-r--r--internal/io/fs/catfile.go20
-rw-r--r--internal/io/fs/readfile.go345
-rw-r--r--internal/io/fs/readfilelcontext.go214
-rw-r--r--internal/io/fs/tailfile.go20
-rw-r--r--internal/server/handlers/readcommand.go3
6 files changed, 2 insertions, 601 deletions
diff --git a/.gitignore b/.gitignore
index 6d98334..9a1dda1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,7 @@
*_proprietary.go
*.csv
*.tmp
+*.query
*.out
cache/
log/
diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go
deleted file mode 100644
index e4676f3..0000000
--- a/internal/io/fs/catfile.go
+++ /dev/null
@@ -1,20 +0,0 @@
-package fs
-
-// CatFile is for reading a whole file.
-type CatFile struct {
- readFile
-}
-
-// NewCatFile returns a new file catter.
-func NewCatFile(filePath string, globID string, serverMessages chan<- string) CatFile {
- return CatFile{
- readFile: readFile{
- filePath: filePath,
- globID: globID,
- serverMessages: serverMessages,
- retry: false,
- canSkipLines: false,
- seekEOF: false,
- },
- }
-}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
deleted file mode 100644
index 3dafd9b..0000000
--- a/internal/io/fs/readfile.go
+++ /dev/null
@@ -1,345 +0,0 @@
-package fs
-
-import (
- "bufio"
- "bytes"
- "compress/gzip"
- "context"
- "errors"
- "fmt"
- "io"
- "os"
- "strings"
- "sync"
- "time"
-
- "github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/dlog"
- "github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/pool"
- "github.com/mimecast/dtail/internal/lcontext"
- "github.com/mimecast/dtail/internal/regex"
-
- "github.com/DataDog/zstd"
-)
-
-type readStatus int
-
-const (
- nothing readStatus = iota
- abortReading readStatus = iota
- continueReading readStatus = iota
-)
-
-// Used to tail and filter a local log file.
-type readFile struct {
- // Various statistics (e.g. regex hit percentage, transfer percentage).
- stats
- // Path of log file to tail.
- filePath string
- // The glob identifier of the file.
- globID string
- // Channel to send a server message to the dtail client
- serverMessages chan<- string
- // Periodically retry reading file.
- retry bool
- // Can I skip messages when there are too many?
- canSkipLines bool
- // Seek to the EOF before processing file?
- seekEOF bool
- // Warned already about a long line.
- warnedAboutLongLine bool
-}
-
-// String returns the string representation of the readFile
-func (f readFile) String() string {
- return fmt.Sprintf(
- "readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)",
- f.filePath,
- f.globID,
- f.retry,
- f.canSkipLines,
- f.seekEOF)
-}
-
-// FilePath returns the full file path.
-func (f readFile) FilePath() string {
- return f.filePath
-}
-
-// Retry reading the file on error?
-func (f readFile) Retry() bool {
- return f.retry
-}
-
-// Start tailing a log file.
-func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
- lines chan<- *line.Line, re regex.Regex) error {
-
- reader, fd, err := f.makeReader()
- if fd != nil {
- defer fd.Close()
- }
- if err != nil {
- return err
- }
-
- rawLines := make(chan *bytes.Buffer, 100)
- truncate := make(chan struct{})
-
- readCtx, readCancel := context.WithCancel(ctx)
- var filterWg sync.WaitGroup
- filterWg.Add(1)
-
- go f.periodicTruncateCheck(ctx, truncate)
- go func() {
- f.filter(ctx, ltx, rawLines, lines, re)
- filterWg.Done()
- // If the filter stopped, make the reader stop too, no need to read
- // more data if there is nothing more the filter wants to filter for!
- // E.g. it could be that we only want to filter N matches but not more.
- readCancel()
- }()
-
- err = f.read(readCtx, fd, reader, rawLines, truncate)
- close(rawLines)
- // Filter may sends some data still. So wait until it is done here.
- filterWg.Wait()
-
- return err
-}
-
-func (f *readFile) makeReader() (*bufio.Reader, *os.File, error) {
- if f.filePath == "" && f.globID == "-" {
- return f.makePipeReader()
- }
- return f.makeFileReader()
-}
-
-func (f *readFile) makeFileReader() (*bufio.Reader, *os.File, error) {
- var reader *bufio.Reader
- fd, err := os.Open(f.filePath)
- if err != nil {
- return reader, fd, err
- }
-
- if f.seekEOF {
- fd.Seek(0, io.SeekEnd)
- }
-
- reader, err = f.makeCompressedFileReader(fd)
- if err != nil {
- return reader, fd, err
- }
-
- return reader, fd, nil
-}
-
-func (f *readFile) makePipeReader() (*bufio.Reader, *os.File, error) {
- return bufio.NewReader(os.Stdin), nil, nil
-}
-
-func (f *readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) {
- for {
- select {
- case <-time.After(time.Second * 3):
- select {
- case truncate <- struct{}{}:
- case <-ctx.Done():
- }
- case <-ctx.Done():
- return
- }
- }
-}
-
-func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, err error) {
- switch {
- case strings.HasSuffix(f.FilePath(), ".gz"):
- fallthrough
- case strings.HasSuffix(f.FilePath(), ".gzip"):
- dlog.Common.Info(f.FilePath(), "Detected gzip compression format")
- var gzipReader *gzip.Reader
- gzipReader, err = gzip.NewReader(fd)
- if err != nil {
- return
- }
- reader = bufio.NewReader(gzipReader)
- case strings.HasSuffix(f.FilePath(), ".zst"):
- dlog.Common.Info(f.FilePath(), "Detected zstd compression format")
- reader = bufio.NewReader(zstd.NewReader(fd))
- default:
- reader = bufio.NewReader(fd)
- }
- return
-}
-
-func (f *readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader,
- rawLines chan *bytes.Buffer, truncate <-chan struct{}) error {
-
- // Use chunked reader for better performance
- chunkedReader := NewChunkedReader(reader, 64*1024) // 64KB chunks
-
- // Create a goroutine to handle truncate signals
- go func() {
- select {
- case <-truncate:
- // Handle truncation by attempting to seek to end of file
- if fd != nil {
- fd.Seek(0, io.SeekEnd)
- }
- case <-ctx.Done():
- return
- }
- }()
-
- // Process lines using chunked reader
- for {
- err := chunkedReader.ProcessLines(ctx, rawLines, config.Server.MaxLineLength,
- f.filePath, f.serverMessages, f.seekEOF)
- if err != nil {
- if err == io.EOF || err == context.Canceled {
- return nil
- }
- // Handle read errors similar to original implementation
- time.Sleep(time.Millisecond * 100)
- continue
- }
- return nil
- }
-}
-
-// Filter log lines matching a given regular expression.
-func (f *readFile) filter(ctx context.Context, ltx lcontext.LContext,
- rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re regex.Regex) {
-
- // Do we have any kind of local context settings? If so then run the more complex
- // filterWithLContext method.
- if ltx.Has() {
- // We can not skip transmitting any lines to the client with a local
- // grep context specified.
- f.canSkipLines = false
- f.filterWithLContext(ctx, ltx, rawLines, lines, re)
- return
- }
-
- f.filterWithoutLContext(ctx, rawLines, lines, re)
-}
-
-func (f *readFile) transmittable(rawLine *bytes.Buffer, length, capacity int,
- re regex.Regex) (*line.Line, bool) {
-
- newLine := line.Null()
- if !re.Match(rawLine.Bytes()) {
- f.updateLineNotMatched()
- f.updateLineNotTransmitted()
- return newLine, false
- }
- f.updateLineMatched()
-
- // Can we actually send more messages, channel capacity reached?
- if f.canSkipLines && length >= capacity {
- f.updateLineNotTransmitted()
- return newLine, false
- }
- f.updateLineTransmitted()
-
- return line.New(rawLine, f.totalLineCount(), f.transmittedPerc(), f.globID), true
-}
-
-// Check wether log file is truncated. Returns nil if not.
-func (f *readFile) truncated(fd *os.File) (bool, error) {
- if fd == nil {
- return false, nil
- }
-
- dlog.Common.Debug(f.filePath, "File truncation check")
-
- // Can not seek currently open FD.
- currentPosition, err := fd.Seek(0, os.SEEK_CUR)
- if err != nil {
- return true, err
- }
- // Can not open file at original path.
- pathFd, err := os.Open(f.filePath)
- if err != nil {
- return true, err
- }
- defer pathFd.Close()
-
- // Can not seek file at original path.
- pathPosition, err := pathFd.Seek(0, io.SeekEnd)
- if err != nil {
- return true, err
- }
- if currentPosition > pathPosition {
- return true, errors.New("File got truncated")
- }
- return false, nil
-}
-
-// Deal with the scenario that nothing could be read from the fd.
-func (f *readFile) handleReadError(ctx context.Context, err error, fd *os.File,
- rawLines chan *bytes.Buffer, truncate <-chan struct{},
- message *bytes.Buffer) (readStatus, error) {
-
- if err != io.EOF {
- return abortReading, err
- }
-
- select {
- case <-truncate:
- if isTruncated, err := f.truncated(fd); isTruncated {
- return abortReading, err
- }
- case <-ctx.Done():
- return abortReading, nil
- default:
- }
-
- if !f.seekEOF {
- dlog.Common.Info(f.FilePath(), "End of file reached")
- if len(message.Bytes()) > 0 {
- select {
- case rawLines <- message:
- case <-ctx.Done():
- }
- }
- return abortReading, nil
- }
-
- return nothing, nil
-}
-
-// Now process the byte we just read from the fd.
-func (f *readFile) handleReadByte(ctx context.Context, b byte,
- rawLines chan *bytes.Buffer, message *bytes.Buffer) (readStatus, *bytes.Buffer) {
-
- switch b {
- case '\n':
- select {
- case rawLines <- message:
- message = pool.BytesBuffer.Get().(*bytes.Buffer)
- f.warnedAboutLongLine = false
- case <-ctx.Done():
- return abortReading, message
- }
- default:
- if message.Len() >= config.Server.MaxLineLength {
- if !f.warnedAboutLongLine {
- f.serverMessages <- dlog.Common.Warn(f.filePath,
- "Long log line, splitting into multiple lines") + "\n"
- f.warnedAboutLongLine = true
- }
- message.WriteByte('\n')
- select {
- case rawLines <- message:
- message = pool.BytesBuffer.Get().(*bytes.Buffer)
- case <-ctx.Done():
- return abortReading, message
- }
- }
- }
-
- return nothing, message
-}
diff --git a/internal/io/fs/readfilelcontext.go b/internal/io/fs/readfilelcontext.go
deleted file mode 100644
index 44ce17d..0000000
--- a/internal/io/fs/readfilelcontext.go
+++ /dev/null
@@ -1,214 +0,0 @@
-package fs
-
-import (
- "bytes"
- "context"
-
- "github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/pool"
- "github.com/mimecast/dtail/internal/lcontext"
- "github.com/mimecast/dtail/internal/regex"
-)
-
-// The local context state.
-type ltxState struct {
- // Max state
- maxCount int
- processMaxCount bool
- maxReached bool
-
- // Before state
- before int
- processBefore bool
- beforeBuf chan *bytes.Buffer
-
- // After state
- after int
- processAfter bool
-}
-
-// We don't have any local grep context, which makes life much simpler and more efficient.
-func (f *readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer,
- lines chan<- *line.Line, re regex.Regex) {
-
- for {
- select {
- case rawLine, ok := <-rawLines:
- f.updatePosition()
- if !ok {
- return
- }
- if newLine, ok := f.transmittable(rawLine, len(lines), cap(lines), re); ok {
- select {
- case lines <- newLine:
- case <-ctx.Done():
- return
- }
- }
- }
- }
-}
-
-// Filter log lines matching a given regular expression, however with local grep context.
-func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext,
- rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re regex.Regex) {
-
- var ls ltxState
-
- // The following 3 scenarios may also be used at once/any combination together.
-
- // Scenario 1: Finish once maxCount hits found
- ls.maxCount = ltx.MaxCount
- ls.processMaxCount = ls.maxCount > 0
- ls.maxReached = false
-
- // Scenario 2: Print prev. N lines when current line matches.
- ls.before = ltx.BeforeContext
- ls.processBefore = ls.before > 0
- if ls.processBefore {
- ls.beforeBuf = make(chan *bytes.Buffer, ls.before)
- }
-
- // Screnario 3: Print next N lines when current line matches.
- ls.after = 0
- ls.processAfter = ltx.AfterContext > 0
-
- // No go through all raw lines read to determine with they satisfy the local
- // context or not. "Matching" lines will be sent to the lines channel.
- for rawLine := range rawLines {
- status := f.filterLineWithLContext(ctx, &ltx, &ls, rawLines, lines, &re, rawLine)
- switch status {
- case abortReading:
- return
- default:
- }
- }
-}
-
-// Filter log lines matching a given regular expression, however with local grep context.
-func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LContext,
- ls *ltxState, rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re *regex.Regex,
- rawLine *bytes.Buffer) readStatus {
-
- f.updatePosition()
-
- if !re.Match(rawLine.Bytes()) {
- f.updateLineNotMatched()
- status := f.lContextNotMatched(ctx, ls, lines, rawLine)
- switch status {
- case nothing:
- default:
- return status
- }
- }
-
- f.updateLineMatched()
-
- // If we have an "after" context to worry about...
- if ls.processAfter {
- if ls.maxReached {
- // We have reached the "max" hits. Stop/abort reading.
- return abortReading
- }
- // Reset the "after" context.
- ls.after = ltx.AfterContext
- }
-
- // If we have a "before" context to worry about...
- if ls.processBefore {
- status := f.lContextProcessBefore(ctx, ls, lines, rawLine)
- switch status {
- case nothing:
- default:
- return status
- }
- }
-
- line := line.New(rawLine, f.totalLineCount(), 100, f.globID)
-
- select {
- case lines <- line:
- // If we have a "max" context to worry about...
- if ls.processMaxCount {
- status := f.lContextProcessMaxCount(ctx, ls)
- switch status {
- case nothing:
- default:
- return status
- }
- }
- case <-ctx.Done():
- return abortReading
- }
-
- return nothing
-}
-
-// Do some post-processing for the "after" and the "before" contexts in case the
-// line didn't match the regex.
-func (f *readFile) lContextNotMatched(ctx context.Context, ls *ltxState,
- lines chan<- *line.Line, rawLine *bytes.Buffer) readStatus {
-
- if ls.processAfter && ls.after > 0 {
- ls.after--
- myLine := line.New(rawLine, f.totalLineCount(), 100, f.globID)
-
- select {
- case lines <- myLine:
- case <-ctx.Done():
- return abortReading
- }
-
- } else if ls.processBefore {
- // Keep last num BeforeContext raw messages.
- select {
- case ls.beforeBuf <- rawLine:
- default:
- pool.RecycleBytesBuffer(<-ls.beforeBuf)
- ls.beforeBuf <- rawLine
- }
- }
-
- return continueReading
-}
-
-// Do some processing for the "before" context.
-func (f *readFile) lContextProcessBefore(ctx context.Context,
- ls *ltxState, lines chan<- *line.Line, rawLine *bytes.Buffer) readStatus {
-
- i := uint64(len(ls.beforeBuf))
- for {
- select {
- case rawLine := <-ls.beforeBuf:
- myLine := line.New(rawLine, f.totalLineCount()-i, 100, f.globID)
- i--
-
- select {
- case lines <- myLine:
- case <-ctx.Done():
- return abortReading
- }
- default:
- // beforeBuf is now empty.
- }
- if len(ls.beforeBuf) == 0 {
- break
- }
- }
-
- return nothing
-}
-
-// Do some processing for the "max" context.
-func (f *readFile) lContextProcessMaxCount(ctx context.Context, ls *ltxState) readStatus {
- ls.maxCount--
- if ls.maxCount == 0 {
- if !ls.processAfter || ls.after == 0 {
- return abortReading
- }
- // Unfortunatley we have to continue filter, as there might be more lines to print
- ls.maxReached = true
- }
-
- return nothing
-}
diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go
deleted file mode 100644
index 7a40ac4..0000000
--- a/internal/io/fs/tailfile.go
+++ /dev/null
@@ -1,20 +0,0 @@
-package fs
-
-// TailFile is to tail and filter a log file.
-type TailFile struct {
- readFile
-}
-
-// NewTailFile returns a new file tailer.
-func NewTailFile(filePath string, globID string, serverMessages chan<- string) TailFile {
- return TailFile{
- readFile: readFile{
- filePath: filePath,
- globID: globID,
- serverMessages: serverMessages,
- retry: true,
- canSkipLines: true,
- seekEOF: true,
- },
- }
-}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 59e9d1b..967cae4 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -51,8 +51,7 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
return
}
- // Always use channelless mode now - channel-based code is deprecated
- dlog.Server.Debug("Using channelless processing mode for mode:", r.mode)
+ dlog.Server.Debug("Processing mode:", r.mode)
r.startChannelless(ctx, ltx, args, re, retries, queryStr)
}