summaryrefslogtreecommitdiff
path: root/fs/readfile.go
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-01-09 20:30:15 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-01-09 20:30:15 +0000
commit3755a9911ecb05886577095f2b8cc8b9e4066a3a (patch)
tree86e24bc466986cb5c9c6d167a918e6064defeafc /fs/readfile.go
Release of DTail v1.0.0v1.0.0
Diffstat (limited to 'fs/readfile.go')
-rw-r--r--fs/readfile.go318
1 files changed, 318 insertions, 0 deletions
diff --git a/fs/readfile.go b/fs/readfile.go
new file mode 100644
index 0000000..375378b
--- /dev/null
+++ b/fs/readfile.go
@@ -0,0 +1,318 @@
+package fs
+
+import (
+ "bufio"
+ "compress/gzip"
+ "dtail/logger"
+ "errors"
+ "io"
+ "os"
+ "regexp"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/DataDog/zstd"
+)
+
+// Used to tail and filter a local log file.
+type readFile struct {
+ // Various statistics (e.g. regex hit percentage, transfer percentage).
+ stats
+ // Path of log file to tail.
+ filePath string
+ // Only consider all log lines matching this regular expression.
+ re *regexp.Regexp
+ // The glob identifier of the file.
+ globID string
+ // Channel to send a server message to the dtail client
+ serverMessages chan<- string
+ // Signals to stop tailing the log file.
+ stop chan struct{}
+ // Periodically retry reading file.
+ retry bool
+ // Can I skip messages when there are too many?
+ canSkipLines bool
+ // Seek to the EOF before processing file?
+ seekEOF bool
+ // Mutex to control the stopping of the file
+ mutex *sync.Mutex
+ limiter chan struct{}
+}
+
+// FilePath returns the full file path.
+func (f readFile) FilePath() string {
+ return f.filePath
+}
+
+// Retry reading the file on error?
+func (f readFile) Retry() bool {
+ return f.retry
+}
+
+// Start tailing a log file.
+func (f readFile) Start(lines chan<- LineRead, regex string) error {
+ defer func() {
+ select {
+ case <-f.limiter:
+ default:
+ }
+ }()
+
+ select {
+ case f.limiter <- struct{}{}:
+ default:
+ select {
+ case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."):
+ case <-f.stop:
+ return nil
+ }
+ f.limiter <- struct{}{}
+ }
+
+ fd, err := os.Open(f.filePath)
+ if err != nil {
+ return err
+ }
+ defer fd.Close()
+
+ if f.seekEOF {
+ fd.Seek(0, io.SeekEnd)
+ }
+
+ rawLines := make(chan []byte, 100)
+ truncate := make(chan struct{})
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+
+ go f.periodicTruncateCheck(truncate)
+ go f.filter(&wg, rawLines, lines, regex)
+
+ err = f.read(fd, rawLines, truncate)
+ close(rawLines)
+ wg.Wait()
+
+ return err
+}
+
+func (f readFile) periodicTruncateCheck(truncate chan struct{}) {
+ for {
+ select {
+ case <-time.After(time.Second * 3):
+ select {
+ case truncate <- struct{}{}:
+ case <-f.stop:
+ }
+ case <-f.stop:
+ return
+ }
+ }
+}
+
+// Stop reading file.
+func (f readFile) Stop() {
+ f.mutex.Lock()
+ defer f.mutex.Unlock()
+
+ select {
+ case <-f.stop:
+ return
+ default:
+ }
+
+ close(f.stop)
+}
+
+func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
+ switch {
+ case strings.HasSuffix(f.FilePath(), ".gz"):
+ fallthrough
+ case strings.HasSuffix(f.FilePath(), ".gzip"):
+ logger.Info(f.FilePath(), "Detected gzip compression format")
+ var gzipReader *gzip.Reader
+ gzipReader, err = gzip.NewReader(fd)
+ if err != nil {
+ return
+ }
+ reader = bufio.NewReader(gzipReader)
+ case strings.HasSuffix(f.FilePath(), ".zst"):
+ logger.Info(f.FilePath(), "Detected zstd compression format")
+ reader = bufio.NewReader(zstd.NewReader(fd))
+ default:
+ reader = bufio.NewReader(fd)
+ }
+
+ return
+}
+
+func (f readFile) read(fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error {
+ reader, err := f.makeReader(fd)
+ if err != nil {
+ return err
+ }
+ rawLine := make([]byte, 0, 512)
+ var offset uint64
+
+ lineLengthThreshold := 1024 * 1024 // 1mb
+ longLineWarning := false
+
+ for {
+ select {
+ case <-truncate:
+ if isTruncated, err := f.truncated(fd); isTruncated {
+ return err
+ }
+ logger.Info(f.filePath, "Current offset", offset)
+
+ case <-f.stop:
+ return nil
+ default:
+ }
+
+ // Read some bytes (max 4k at once as of go 1.12). isPrefix will
+ // be set if line does not fit into 4k buffer.
+ bytes, isPrefix, err := reader.ReadLine()
+
+ if err != nil {
+ // If EOF, sleep a couple of ms and return with nil error.
+ // If other error, return with non-nil error.
+ if err != io.EOF {
+ return err
+ }
+ if !f.seekEOF {
+ logger.Debug(f.FilePath(), "End of file reached")
+ return nil
+ }
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ rawLine = append(rawLine, bytes...)
+ offset += uint64(len(bytes))
+
+ if !isPrefix {
+ // last LineRead call returned contend until end of line.
+ rawLine = append(rawLine, '\n')
+ select {
+ case rawLines <- rawLine:
+ case <-f.stop:
+ return nil
+ }
+ rawLine = make([]byte, 0, 512)
+ if longLineWarning {
+ longLineWarning = false
+ }
+ continue
+ }
+
+ // Last LineRead call could not read content until end of line, buffer
+ // was too small. Determine whether we exceed the max line length we
+ // want dtail to send to the client at once. Possibly split up log line
+ // into multiple log lines.
+ if len(rawLine) >= lineLengthThreshold {
+ if !longLineWarning {
+ f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines")
+ // Only print out one warning per long log line.
+ longLineWarning = true
+ }
+ rawLine = append(rawLine, '\n')
+ select {
+ case rawLines <- rawLine:
+ case <-f.stop:
+ return nil
+ }
+ rawLine = make([]byte, 0, 512)
+ }
+ }
+}
+
+// Filter log lines matching a given regular expression.
+func (f readFile) filter(wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- LineRead, regex string) {
+ defer wg.Done()
+
+ if regex == "" {
+ regex = "."
+ }
+
+ re, err := regexp.Compile(regex)
+ if err != nil {
+ logger.Error(regex, "Can't compile regex, using '.' instead", err)
+ re = regexp.MustCompile(".")
+ }
+ f.re = re
+
+ for {
+ select {
+ case line, ok := <-rawLines:
+ f.updatePosition()
+ if !ok {
+ return
+ }
+ if filteredLine, ok := f.transmittable(line, len(lines), cap(lines)); ok {
+ select {
+ case lines <- filteredLine:
+ case <-f.stop:
+ return
+ }
+ }
+ }
+ }
+}
+
+func (f readFile) transmittable(line []byte, length, capacity int) (LineRead, bool) {
+ var read LineRead
+
+ if !f.re.Match(line) {
+ f.updateLineNotMatched()
+ f.updateLineNotTransmitted()
+ return read, false
+ }
+ f.updateLineMatched()
+
+ // Can we actually send more messages, channel capacity reached?
+ if f.canSkipLines && length >= capacity {
+ f.updateLineNotTransmitted()
+ return read, false
+ }
+ f.updateLineTransmitted()
+
+ read = LineRead{
+ Content: line,
+ GlobID: &f.globID,
+ Count: f.totalLineCount(),
+ TransmittedPerc: f.transmittedPerc(),
+ }
+
+ return read, true
+}
+
+// Check wether log file is truncated. Returns nil if not.
+func (f readFile) truncated(fd *os.File) (bool, error) {
+ logger.Debug(f.filePath, "File truncation check")
+
+ // Can not seek currently open FD.
+ curPos, err := fd.Seek(0, os.SEEK_CUR)
+ if err != nil {
+ return true, err
+ }
+
+ // Can not open file at original path.
+ pathFd, err := os.Open(f.filePath)
+ if err != nil {
+ return true, err
+ }
+ defer pathFd.Close()
+
+ // Can not seek file at original path.
+ pathPos, err := pathFd.Seek(0, io.SeekEnd)
+ if err != nil {
+ return true, err
+ }
+
+ if curPos > pathPos {
+ return true, errors.New("File got truncated")
+ }
+
+ return false, nil
+}