summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/datas/rbuffer.go2
-rw-r--r--internal/io/fs/readfile.go58
-rw-r--r--internal/server/handlers/readcommand.go33
3 files changed, 67 insertions, 26 deletions
diff --git a/internal/datas/rbuffer.go b/internal/datas/rbuffer.go
index df8f622..a705a43 100644
--- a/internal/datas/rbuffer.go
+++ b/internal/datas/rbuffer.go
@@ -2,6 +2,8 @@ package datas
import "fmt"
+// TODO: Unused code file, delete it.
+
// RBuffer is a simple circular string ring buffer data structure.
type RBuffer struct {
Capacity int
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index a42fc53..9c2f53c 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -65,17 +65,13 @@ func (f readFile) Retry() bool {
func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
lines chan<- line.Line, re regex.Regex) error {
- dlog.Common.Trace("readFile", f)
-
- fd, err := os.Open(f.filePath)
+ reader, fd, err := f.makeReader()
+ if fd != nil {
+ defer fd.Close()
+ }
if err != nil {
return err
}
- defer fd.Close()
-
- if f.seekEOF {
- fd.Seek(0, io.SeekEnd)
- }
rawLines := make(chan *bytes.Buffer, 100)
truncate := make(chan struct{})
@@ -94,7 +90,7 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
readCancel()
}()
- err = f.read(readCtx, fd, rawLines, truncate)
+ err = f.read(readCtx, fd, reader, rawLines, truncate)
close(rawLines)
// Filter may sends some data still. So wait until it is done here.
filterWg.Wait()
@@ -102,6 +98,36 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
return err
}
+func (f readFile) makeReader() (*bufio.Reader, *os.File, error) {
+ if f.filePath == "PIPE" && f.globID == "PIPE" {
+ return f.makePipeReader()
+ }
+ return f.makeFileReader()
+}
+
+func (f readFile) makeFileReader() (*bufio.Reader, *os.File, error) {
+ var reader *bufio.Reader
+ fd, err := os.Open(f.filePath)
+ if err != nil {
+ return reader, fd, err
+ }
+
+ if f.seekEOF {
+ fd.Seek(0, io.SeekEnd)
+ }
+
+ reader, err = f.makeCompressedFileReader(fd)
+ if err != nil {
+ return reader, fd, err
+ }
+
+ return reader, fd, nil
+}
+
+func (f readFile) makePipeReader() (*bufio.Reader, *os.File, error) {
+ return bufio.NewReader(os.Stdin), nil, nil
+}
+
func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) {
for {
select {
@@ -116,7 +142,7 @@ func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struc
}
}
-func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
+func (f readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, err error) {
switch {
case strings.HasSuffix(f.FilePath(), ".gz"):
fallthrough
@@ -137,14 +163,10 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
return
}
-func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer,
- truncate <-chan struct{}) error {
+func (f readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader,
+ rawLines chan *bytes.Buffer, truncate <-chan struct{}) error {
var offset uint64
- reader, err := f.makeReader(fd)
- if err != nil {
- return err
- }
lineLengthThreshold := 1024 * 1024 // 1mb
warnedAboutLongLine := false
@@ -387,6 +409,10 @@ func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity
// Check wether log file is truncated. Returns nil if not.
func (f readFile) truncated(fd *os.File) (bool, error) {
+ if fd == nil {
+ return false, nil
+ }
+
dlog.Common.Debug(f.filePath, "File truncation check")
// Can not seek currently open FD.
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 51077fc..bd536de 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -2,6 +2,7 @@ package handlers
import (
"context"
+ "os"
"path/filepath"
"strings"
"sync"
@@ -45,6 +46,16 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
"Unable to parse command", args, argc))
return
}
+
+ // In serverless mode, can also read data from pipe
+ // e.g.: grep foo bar.log | dmap 'from STATS select ...'
+ if r.isInputFromPipe() {
+ dlog.Server.Debug("Reading data from pipe")
+ r.read(ctx, ltx, "PIPE", "PIPE", re)
+ return
+ }
+
+ dlog.Server.Debug("Reading data from file(s)")
r.readGlob(ctx, ltx, args[1], re, retries)
}
@@ -106,20 +117,13 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
"Unable to read file(s), check server logs"))
return
}
- r.readFile(ctx, ltx, path, globID, re)
+ r.read(ctx, ltx, path, globID, re)
}
-func (*readCommand) limit(ctx context.Context, limiter chan struct{}, message string) {
- select {
- case <-ctx.Done():
- return
- }
-}
-
-func (r *readCommand) readFile(ctx context.Context, ltx lcontext.LContext,
+func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex) {
- dlog.Server.Info(r.server.user, "Start reading file", path, globID)
+ dlog.Server.Info(r.server.user, "Start reading", path, globID)
var reader fs.FileReader
var limiter chan struct{}
@@ -206,3 +210,12 @@ func (r *readCommand) makeGlobID(path, glob string) string {
dlog.Server.Warn("Empty file path given?", path, glob))
return ""
}
+
+func (r *readCommand) isInputFromPipe() bool {
+ if !r.server.serverless {
+ // Can read from pipe only in serverless mode.
+ return false
+ }
+ fileInfo, _ := os.Stdin.Stat()
+ return fileInfo.Mode()&os.ModeCharDevice == 0
+}