summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--integrationtests/commandutils.go21
-rw-r--r--integrationtests/dmap_test.go56
-rw-r--r--integrationtests/dserver_test.go4
-rw-r--r--integrationtests/dtail_test.go4
-rw-r--r--integrationtests/dtailhealth_test.go2
-rw-r--r--internal/io/fs/readfile.go2
-rw-r--r--internal/server/handlers/readcommand.go5
7 files changed, 70 insertions, 24 deletions
diff --git a/integrationtests/commandutils.go b/integrationtests/commandutils.go
index d5b5987..af898ab 100644
--- a/integrationtests/commandutils.go
+++ b/integrationtests/commandutils.go
@@ -4,6 +4,7 @@ import (
"bufio"
"context"
"fmt"
+ "io"
"os"
"os/exec"
"strings"
@@ -47,8 +48,8 @@ func runCommandRetry(ctx context.Context, t *testing.T, retries int, stdoutFile,
return
}
-func startCommand(ctx context.Context, t *testing.T, cmdStr string,
- args ...string) (<-chan string, <-chan string, <-chan error, error) {
+func startCommand(ctx context.Context, t *testing.T, inPipeFile,
+ cmdStr string, args ...string) (<-chan string, <-chan string, <-chan error, error) {
stdoutCh := make(chan string)
stderrCh := make(chan string)
@@ -65,12 +66,28 @@ func startCommand(ctx context.Context, t *testing.T, cmdStr string,
if err != nil {
return stdoutCh, stderrCh, nil, err
}
+
cmdStderr, err := cmd.StderrPipe()
err = cmd.Start()
if err != nil {
return stdoutCh, stderrCh, nil, err
}
+ // Read input file and send to stdin pipe?
+ if inPipeFile != "" {
+ t.Log(fmt.Sprintf("Piping %s to stdin pipe", inPipeFile))
+ stdinPipe, err := cmd.StdinPipe()
+ if err != nil {
+ return stdoutCh, stderrCh, nil, err
+ }
+ fd, err := os.Open(inPipeFile)
+ if err != nil {
+ return stdoutCh, stderrCh, nil, err
+ }
+ defer fd.Close()
+ go io.Copy(stdinPipe, bufio.NewReader(fd))
+ }
+
go func() {
scanner := bufio.NewScanner(cmdStdout)
scanner.Split(bufio.ScanLines)
diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go
index 6a93b7b..c60a828 100644
--- a/integrationtests/dmap_test.go
+++ b/integrationtests/dmap_test.go
@@ -14,6 +14,20 @@ func TestDMap(t *testing.T) {
t.Log("Skipping")
return
}
+
+ t.Log("Testing dmap with input file")
+ if err := testDmap(t, false); err != nil {
+ t.Log(err)
+ return
+ }
+ t.Log("Testing dmap with stdin input pipe")
+ if err := testDmap(t, true); err != nil {
+ t.Log(err)
+ return
+ }
+}
+
+func testDmap(t *testing.T, usePipe bool) error {
inFile := "mapr_testdata.log"
csvFile := "dmap.csv.tmp"
expectedCsvFile := "dmap.csv.expected"
@@ -27,32 +41,45 @@ func TestDMap(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, "../dmap",
- "--cfg", "none",
- "--query", query,
- "--logger", "stdout",
- "--logLevel", "error",
- "--noColor",
- inFile)
+ var stdoutCh, stderrCh <-chan string
+ var cmdErrCh <-chan error
+ var err error
+
+ if usePipe {
+ stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t,
+ inFile, "../dmap",
+ "--cfg", "none",
+ "--query", query,
+ "--logger", "stdout",
+ "--logLevel", "error",
+ "--noColor")
+ } else {
+ stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t,
+ "", "../dmap",
+ "--cfg", "none",
+ "--query", query,
+ "--logger", "stdout",
+ "--logLevel", "error",
+ "--noColor",
+ inFile)
+ }
if err != nil {
- t.Error(err)
- return
+ return err
}
waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
if err := compareFiles(t, csvFile, expectedCsvFile); err != nil {
- t.Error(err)
- return
+ return err
}
if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
- t.Error(err)
- return
+ return err
}
os.Remove(csvFile)
os.Remove(queryFile)
+ return nil
}
func TestDMap2(t *testing.T) {
@@ -111,7 +138,8 @@ func TestDMap3(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, "../dmap",
+ stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
+ "", "../dmap",
"--query", query,
"--cfg", "none",
"--logger", "stdout",
diff --git a/integrationtests/dserver_test.go b/integrationtests/dserver_test.go
index c985777..27ce773 100644
--- a/integrationtests/dserver_test.go
+++ b/integrationtests/dserver_test.go
@@ -27,7 +27,7 @@ func TestDServer(t *testing.T) {
defer cancel()
stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
- "../dserver",
+ "", "../dserver",
"--cfg", "dserver.cfg",
"--logger", "stdout",
"--logLevel", "info",
@@ -94,7 +94,7 @@ func TestDServer2(t *testing.T) {
}()
stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
- "../dserver",
+ "", "../dserver",
"--cfg", "dserver2.cfg",
"--logger", "stdout",
"--logLevel", "debug",
diff --git a/integrationtests/dtail_test.go b/integrationtests/dtail_test.go
index 6fa5308..0082843 100644
--- a/integrationtests/dtail_test.go
+++ b/integrationtests/dtail_test.go
@@ -35,7 +35,7 @@ func TestDTailWithServer(t *testing.T) {
}()
serverCh, _, _, err := startCommand(ctx, t,
- "../dserver",
+ "", "../dserver",
"--cfg", "none",
"--logger", "stdout",
"--logLevel", "info",
@@ -49,7 +49,7 @@ func TestDTailWithServer(t *testing.T) {
// MAYBETODO: In testmode, never read a config file (use none for all commands)
clientCh, _, _, err := startCommand(ctx, t,
- "../dtail",
+ "", "../dtail",
"--cfg", "none",
"--logger", "stdout",
"--logLevel", "info",
diff --git a/integrationtests/dtailhealth_test.go b/integrationtests/dtailhealth_test.go
index b53c425..0dd15e8 100644
--- a/integrationtests/dtailhealth_test.go
+++ b/integrationtests/dtailhealth_test.go
@@ -70,7 +70,7 @@ func TestDTailHealthCheck3(t *testing.T) {
defer cancel()
_, _, _, err := startCommand(ctx, t,
- "../dserver",
+ "", "../dserver",
"--cfg", "none",
"--logger", "stdout",
"--logLevel", "trace",
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 9c2f53c..18c20c0 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -99,7 +99,7 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
}
func (f readFile) makeReader() (*bufio.Reader, *os.File, error) {
- if f.filePath == "PIPE" && f.globID == "PIPE" {
+ if f.filePath == "" && f.globID == "-" {
return f.makePipeReader()
}
return f.makeFileReader()
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index bd536de..6d1b55a 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -50,8 +50,9 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
// In serverless mode, can also read data from pipe
// e.g.: grep foo bar.log | dmap 'from STATS select ...'
if r.isInputFromPipe() {
- dlog.Server.Debug("Reading data from pipe")
- r.read(ctx, ltx, "PIPE", "PIPE", re)
+ dlog.Server.Debug("Reading data from stdin pipe")
+ // Empty file path and globID "-" represents reading from the stdin pipe.
+ r.read(ctx, ltx, "", "-", re)
return
}