summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-30 18:14:46 +0300
committerPaul Buetow <paul@buetow.org>2021-10-30 18:14:46 +0300
commit979ae1884ae61d89537c005bbce24d046007a3fa (patch)
tree98762b3cd419ec3b4a7720d5129528b91e94695c /internal/server/handlers/readcommand.go
parent2d1e70eccb3e3a0c9edf4fb859463e0bc8e4c8c9 (diff)
add support to read from stdin pipe in serverless mode, e.g. grep foo.log | dmap "select from ...."
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go33
1 files changed, 23 insertions, 10 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 51077fc..bd536de 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -2,6 +2,7 @@ package handlers
import (
"context"
+ "os"
"path/filepath"
"strings"
"sync"
@@ -45,6 +46,16 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
"Unable to parse command", args, argc))
return
}
+
+ // In serverless mode, can also read data from pipe
+ // e.g.: grep foo bar.log | dmap 'from STATS select ...'
+ if r.isInputFromPipe() {
+ dlog.Server.Debug("Reading data from pipe")
+ r.read(ctx, ltx, "PIPE", "PIPE", re)
+ return
+ }
+
+ dlog.Server.Debug("Reading data from file(s)")
r.readGlob(ctx, ltx, args[1], re, retries)
}
@@ -106,20 +117,13 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
"Unable to read file(s), check server logs"))
return
}
- r.readFile(ctx, ltx, path, globID, re)
+ r.read(ctx, ltx, path, globID, re)
}
-func (*readCommand) limit(ctx context.Context, limiter chan struct{}, message string) {
- select {
- case <-ctx.Done():
- return
- }
-}
-
-func (r *readCommand) readFile(ctx context.Context, ltx lcontext.LContext,
+func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex) {
- dlog.Server.Info(r.server.user, "Start reading file", path, globID)
+ dlog.Server.Info(r.server.user, "Start reading", path, globID)
var reader fs.FileReader
var limiter chan struct{}
@@ -206,3 +210,12 @@ func (r *readCommand) makeGlobID(path, glob string) string {
dlog.Server.Warn("Empty file path given?", path, glob))
return ""
}
+
+func (r *readCommand) isInputFromPipe() bool {
+ if !r.server.serverless {
+ // Can read from pipe only in serverless mode.
+ return false
+ }
+ fileInfo, _ := os.Stdin.Stat()
+ return fileInfo.Mode()&os.ModeCharDevice == 0
+}