diff options
| -rw-r--r-- | internal/datas/rbuffer.go | 2 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 58 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 33 |
3 files changed, 67 insertions, 26 deletions
diff --git a/internal/datas/rbuffer.go b/internal/datas/rbuffer.go index df8f622..a705a43 100644 --- a/internal/datas/rbuffer.go +++ b/internal/datas/rbuffer.go @@ -2,6 +2,8 @@ package datas import "fmt" +// TODO: Unused code file, delete it. + // RBuffer is a simple circular string ring buffer data structure. type RBuffer struct { Capacity int diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index a42fc53..9c2f53c 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -65,17 +65,13 @@ func (f readFile) Retry() bool { func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line, re regex.Regex) error { - dlog.Common.Trace("readFile", f) - - fd, err := os.Open(f.filePath) + reader, fd, err := f.makeReader() + if fd != nil { + defer fd.Close() + } if err != nil { return err } - defer fd.Close() - - if f.seekEOF { - fd.Seek(0, io.SeekEnd) - } rawLines := make(chan *bytes.Buffer, 100) truncate := make(chan struct{}) @@ -94,7 +90,7 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, readCancel() }() - err = f.read(readCtx, fd, rawLines, truncate) + err = f.read(readCtx, fd, reader, rawLines, truncate) close(rawLines) // Filter may sends some data still. So wait until it is done here. filterWg.Wait() @@ -102,6 +98,36 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, return err } +func (f readFile) makeReader() (*bufio.Reader, *os.File, error) { + if f.filePath == "PIPE" && f.globID == "PIPE" { + return f.makePipeReader() + } + return f.makeFileReader() +} + +func (f readFile) makeFileReader() (*bufio.Reader, *os.File, error) { + var reader *bufio.Reader + fd, err := os.Open(f.filePath) + if err != nil { + return reader, fd, err + } + + if f.seekEOF { + fd.Seek(0, io.SeekEnd) + } + + reader, err = f.makeCompressedFileReader(fd) + if err != nil { + return reader, fd, err + } + + return reader, fd, nil +} + +func (f readFile) makePipeReader() (*bufio.Reader, *os.File, error) { + return bufio.NewReader(os.Stdin), nil, nil +} + func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) { for { select { @@ -116,7 +142,7 @@ func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struc } } -func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { +func (f readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, err error) { switch { case strings.HasSuffix(f.FilePath(), ".gz"): fallthrough @@ -137,14 +163,10 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { return } -func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, - truncate <-chan struct{}) error { +func (f readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader, + rawLines chan *bytes.Buffer, truncate <-chan struct{}) error { var offset uint64 - reader, err := f.makeReader(fd) - if err != nil { - return err - } lineLengthThreshold := 1024 * 1024 // 1mb warnedAboutLongLine := false @@ -387,6 +409,10 @@ func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity // Check wether log file is truncated. Returns nil if not. func (f readFile) truncated(fd *os.File) (bool, error) { + if fd == nil { + return false, nil + } + dlog.Common.Debug(f.filePath, "File truncation check") // Can not seek currently open FD. diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 51077fc..bd536de 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -2,6 +2,7 @@ package handlers import ( "context" + "os" "path/filepath" "strings" "sync" @@ -45,6 +46,16 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, "Unable to parse command", args, argc)) return } + + // In serverless mode, can also read data from pipe + // e.g.: grep foo bar.log | dmap 'from STATS select ...' + if r.isInputFromPipe() { + dlog.Server.Debug("Reading data from pipe") + r.read(ctx, ltx, "PIPE", "PIPE", re) + return + } + + dlog.Server.Debug("Reading data from file(s)") r.readGlob(ctx, ltx, args[1], re, retries) } @@ -106,20 +117,13 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC "Unable to read file(s), check server logs")) return } - r.readFile(ctx, ltx, path, globID, re) + r.read(ctx, ltx, path, globID, re) } -func (*readCommand) limit(ctx context.Context, limiter chan struct{}, message string) { - select { - case <-ctx.Done(): - return - } -} - -func (r *readCommand) readFile(ctx context.Context, ltx lcontext.LContext, +func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, path, globID string, re regex.Regex) { - dlog.Server.Info(r.server.user, "Start reading file", path, globID) + dlog.Server.Info(r.server.user, "Start reading", path, globID) var reader fs.FileReader var limiter chan struct{} @@ -206,3 +210,12 @@ func (r *readCommand) makeGlobID(path, glob string) string { dlog.Server.Warn("Empty file path given?", path, glob)) return "" } + +func (r *readCommand) isInputFromPipe() bool { + if !r.server.serverless { + // Can read from pipe only in serverless mode. + return false + } + fileInfo, _ := os.Stdin.Stat() + return fileInfo.Mode()&os.ModeCharDevice == 0 +} |
