diff options
Diffstat (limited to 'fs/readfile.go')
| -rw-r--r-- | fs/readfile.go | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/fs/readfile.go b/fs/readfile.go new file mode 100644 index 0000000..375378b --- /dev/null +++ b/fs/readfile.go @@ -0,0 +1,318 @@ +package fs + +import ( + "bufio" + "compress/gzip" + "dtail/logger" + "errors" + "io" + "os" + "regexp" + "strings" + "sync" + "time" + + "github.com/DataDog/zstd" +) + +// Used to tail and filter a local log file. +type readFile struct { + // Various statistics (e.g. regex hit percentage, transfer percentage). + stats + // Path of log file to tail. + filePath string + // Only consider all log lines matching this regular expression. + re *regexp.Regexp + // The glob identifier of the file. + globID string + // Channel to send a server message to the dtail client + serverMessages chan<- string + // Signals to stop tailing the log file. + stop chan struct{} + // Periodically retry reading file. + retry bool + // Can I skip messages when there are too many? + canSkipLines bool + // Seek to the EOF before processing file? + seekEOF bool + // Mutex to control the stopping of the file + mutex *sync.Mutex + limiter chan struct{} +} + +// FilePath returns the full file path. +func (f readFile) FilePath() string { + return f.filePath +} + +// Retry reading the file on error? +func (f readFile) Retry() bool { + return f.retry +} + +// Start tailing a log file. +func (f readFile) Start(lines chan<- LineRead, regex string) error { + defer func() { + select { + case <-f.limiter: + default: + } + }() + + select { + case f.limiter <- struct{}{}: + default: + select { + case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."): + case <-f.stop: + return nil + } + f.limiter <- struct{}{} + } + + fd, err := os.Open(f.filePath) + if err != nil { + return err + } + defer fd.Close() + + if f.seekEOF { + fd.Seek(0, io.SeekEnd) + } + + rawLines := make(chan []byte, 100) + truncate := make(chan struct{}) + + var wg sync.WaitGroup + wg.Add(1) + + go f.periodicTruncateCheck(truncate) + go f.filter(&wg, rawLines, lines, regex) + + err = f.read(fd, rawLines, truncate) + close(rawLines) + wg.Wait() + + return err +} + +func (f readFile) periodicTruncateCheck(truncate chan struct{}) { + for { + select { + case <-time.After(time.Second * 3): + select { + case truncate <- struct{}{}: + case <-f.stop: + } + case <-f.stop: + return + } + } +} + +// Stop reading file. +func (f readFile) Stop() { + f.mutex.Lock() + defer f.mutex.Unlock() + + select { + case <-f.stop: + return + default: + } + + close(f.stop) +} + +func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { + switch { + case strings.HasSuffix(f.FilePath(), ".gz"): + fallthrough + case strings.HasSuffix(f.FilePath(), ".gzip"): + logger.Info(f.FilePath(), "Detected gzip compression format") + var gzipReader *gzip.Reader + gzipReader, err = gzip.NewReader(fd) + if err != nil { + return + } + reader = bufio.NewReader(gzipReader) + case strings.HasSuffix(f.FilePath(), ".zst"): + logger.Info(f.FilePath(), "Detected zstd compression format") + reader = bufio.NewReader(zstd.NewReader(fd)) + default: + reader = bufio.NewReader(fd) + } + + return +} + +func (f readFile) read(fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error { + reader, err := f.makeReader(fd) + if err != nil { + return err + } + rawLine := make([]byte, 0, 512) + var offset uint64 + + lineLengthThreshold := 1024 * 1024 // 1mb + longLineWarning := false + + for { + select { + case <-truncate: + if isTruncated, err := f.truncated(fd); isTruncated { + return err + } + logger.Info(f.filePath, "Current offset", offset) + + case <-f.stop: + return nil + default: + } + + // Read some bytes (max 4k at once as of go 1.12). isPrefix will + // be set if line does not fit into 4k buffer. + bytes, isPrefix, err := reader.ReadLine() + + if err != nil { + // If EOF, sleep a couple of ms and return with nil error. + // If other error, return with non-nil error. + if err != io.EOF { + return err + } + if !f.seekEOF { + logger.Debug(f.FilePath(), "End of file reached") + return nil + } + time.Sleep(time.Millisecond * 100) + continue + } + + rawLine = append(rawLine, bytes...) + offset += uint64(len(bytes)) + + if !isPrefix { + // last LineRead call returned contend until end of line. + rawLine = append(rawLine, '\n') + select { + case rawLines <- rawLine: + case <-f.stop: + return nil + } + rawLine = make([]byte, 0, 512) + if longLineWarning { + longLineWarning = false + } + continue + } + + // Last LineRead call could not read content until end of line, buffer + // was too small. Determine whether we exceed the max line length we + // want dtail to send to the client at once. Possibly split up log line + // into multiple log lines. + if len(rawLine) >= lineLengthThreshold { + if !longLineWarning { + f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") + // Only print out one warning per long log line. + longLineWarning = true + } + rawLine = append(rawLine, '\n') + select { + case rawLines <- rawLine: + case <-f.stop: + return nil + } + rawLine = make([]byte, 0, 512) + } + } +} + +// Filter log lines matching a given regular expression. +func (f readFile) filter(wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- LineRead, regex string) { + defer wg.Done() + + if regex == "" { + regex = "." + } + + re, err := regexp.Compile(regex) + if err != nil { + logger.Error(regex, "Can't compile regex, using '.' instead", err) + re = regexp.MustCompile(".") + } + f.re = re + + for { + select { + case line, ok := <-rawLines: + f.updatePosition() + if !ok { + return + } + if filteredLine, ok := f.transmittable(line, len(lines), cap(lines)); ok { + select { + case lines <- filteredLine: + case <-f.stop: + return + } + } + } + } +} + +func (f readFile) transmittable(line []byte, length, capacity int) (LineRead, bool) { + var read LineRead + + if !f.re.Match(line) { + f.updateLineNotMatched() + f.updateLineNotTransmitted() + return read, false + } + f.updateLineMatched() + + // Can we actually send more messages, channel capacity reached? + if f.canSkipLines && length >= capacity { + f.updateLineNotTransmitted() + return read, false + } + f.updateLineTransmitted() + + read = LineRead{ + Content: line, + GlobID: &f.globID, + Count: f.totalLineCount(), + TransmittedPerc: f.transmittedPerc(), + } + + return read, true +} + +// Check wether log file is truncated. Returns nil if not. +func (f readFile) truncated(fd *os.File) (bool, error) { + logger.Debug(f.filePath, "File truncation check") + + // Can not seek currently open FD. + curPos, err := fd.Seek(0, os.SEEK_CUR) + if err != nil { + return true, err + } + + // Can not open file at original path. + pathFd, err := os.Open(f.filePath) + if err != nil { + return true, err + } + defer pathFd.Close() + + // Can not seek file at original path. + pathPos, err := pathFd.Seek(0, io.SeekEnd) + if err != nil { + return true, err + } + + if curPos > pathPos { + return true, errors.New("File got truncated") + } + + return false, nil +} |
