diff options
| -rw-r--r-- | integrationtests/commandutils.go | 21 | ||||
| -rw-r--r-- | integrationtests/dmap_test.go | 56 | ||||
| -rw-r--r-- | integrationtests/dserver_test.go | 4 | ||||
| -rw-r--r-- | integrationtests/dtail_test.go | 4 | ||||
| -rw-r--r-- | integrationtests/dtailhealth_test.go | 2 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 2 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 5 |
7 files changed, 70 insertions, 24 deletions
diff --git a/integrationtests/commandutils.go b/integrationtests/commandutils.go index d5b5987..af898ab 100644 --- a/integrationtests/commandutils.go +++ b/integrationtests/commandutils.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "fmt" + "io" "os" "os/exec" "strings" @@ -47,8 +48,8 @@ func runCommandRetry(ctx context.Context, t *testing.T, retries int, stdoutFile, return } -func startCommand(ctx context.Context, t *testing.T, cmdStr string, - args ...string) (<-chan string, <-chan string, <-chan error, error) { +func startCommand(ctx context.Context, t *testing.T, inPipeFile, + cmdStr string, args ...string) (<-chan string, <-chan string, <-chan error, error) { stdoutCh := make(chan string) stderrCh := make(chan string) @@ -65,12 +66,28 @@ func startCommand(ctx context.Context, t *testing.T, cmdStr string, if err != nil { return stdoutCh, stderrCh, nil, err } + cmdStderr, err := cmd.StderrPipe() err = cmd.Start() if err != nil { return stdoutCh, stderrCh, nil, err } + // Read input file and send to stdin pipe? + if inPipeFile != "" { + t.Log(fmt.Sprintf("Piping %s to stdin pipe", inPipeFile)) + stdinPipe, err := cmd.StdinPipe() + if err != nil { + return stdoutCh, stderrCh, nil, err + } + fd, err := os.Open(inPipeFile) + if err != nil { + return stdoutCh, stderrCh, nil, err + } + defer fd.Close() + go io.Copy(stdinPipe, bufio.NewReader(fd)) + } + go func() { scanner := bufio.NewScanner(cmdStdout) scanner.Split(bufio.ScanLines) diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go index 6a93b7b..c60a828 100644 --- a/integrationtests/dmap_test.go +++ b/integrationtests/dmap_test.go @@ -14,6 +14,20 @@ func TestDMap(t *testing.T) { t.Log("Skipping") return } + + t.Log("Testing dmap with input file") + if err := testDmap(t, false); err != nil { + t.Log(err) + return + } + t.Log("Testing dmap with stdin input pipe") + if err := testDmap(t, true); err != nil { + t.Log(err) + return + } +} + +func testDmap(t *testing.T, usePipe bool) error { inFile := "mapr_testdata.log" csvFile := "dmap.csv.tmp" expectedCsvFile := "dmap.csv.expected" @@ -27,32 +41,45 @@ func TestDMap(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, "../dmap", - "--cfg", "none", - "--query", query, - "--logger", "stdout", - "--logLevel", "error", - "--noColor", - inFile) + var stdoutCh, stderrCh <-chan string + var cmdErrCh <-chan error + var err error + + if usePipe { + stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t, + inFile, "../dmap", + "--cfg", "none", + "--query", query, + "--logger", "stdout", + "--logLevel", "error", + "--noColor") + } else { + stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t, + "", "../dmap", + "--cfg", "none", + "--query", query, + "--logger", "stdout", + "--logLevel", "error", + "--noColor", + inFile) + } if err != nil { - t.Error(err) - return + return err } waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) if err := compareFiles(t, csvFile, expectedCsvFile); err != nil { - t.Error(err) - return + return err } if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { - t.Error(err) - return + return err } os.Remove(csvFile) os.Remove(queryFile) + return nil } func TestDMap2(t *testing.T) { @@ -111,7 +138,8 @@ func TestDMap3(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, "../dmap", + stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, + "", "../dmap", "--query", query, "--cfg", "none", "--logger", "stdout", diff --git a/integrationtests/dserver_test.go b/integrationtests/dserver_test.go index c985777..27ce773 100644 --- a/integrationtests/dserver_test.go +++ b/integrationtests/dserver_test.go @@ -27,7 +27,7 @@ func TestDServer(t *testing.T) { defer cancel() stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, - "../dserver", + "", "../dserver", "--cfg", "dserver.cfg", "--logger", "stdout", "--logLevel", "info", @@ -94,7 +94,7 @@ func TestDServer2(t *testing.T) { }() stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, - "../dserver", + "", "../dserver", "--cfg", "dserver2.cfg", "--logger", "stdout", "--logLevel", "debug", diff --git a/integrationtests/dtail_test.go b/integrationtests/dtail_test.go index 6fa5308..0082843 100644 --- a/integrationtests/dtail_test.go +++ b/integrationtests/dtail_test.go @@ -35,7 +35,7 @@ func TestDTailWithServer(t *testing.T) { }() serverCh, _, _, err := startCommand(ctx, t, - "../dserver", + "", "../dserver", "--cfg", "none", "--logger", "stdout", "--logLevel", "info", @@ -49,7 +49,7 @@ func TestDTailWithServer(t *testing.T) { // MAYBETODO: In testmode, never read a config file (use none for all commands) clientCh, _, _, err := startCommand(ctx, t, - "../dtail", + "", "../dtail", "--cfg", "none", "--logger", "stdout", "--logLevel", "info", diff --git a/integrationtests/dtailhealth_test.go b/integrationtests/dtailhealth_test.go index b53c425..0dd15e8 100644 --- a/integrationtests/dtailhealth_test.go +++ b/integrationtests/dtailhealth_test.go @@ -70,7 +70,7 @@ func TestDTailHealthCheck3(t *testing.T) { defer cancel() _, _, _, err := startCommand(ctx, t, - "../dserver", + "", "../dserver", "--cfg", "none", "--logger", "stdout", "--logLevel", "trace", diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 9c2f53c..18c20c0 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -99,7 +99,7 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, } func (f readFile) makeReader() (*bufio.Reader, *os.File, error) { - if f.filePath == "PIPE" && f.globID == "PIPE" { + if f.filePath == "" && f.globID == "-" { return f.makePipeReader() } return f.makeFileReader() diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index bd536de..6d1b55a 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -50,8 +50,9 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, // In serverless mode, can also read data from pipe // e.g.: grep foo bar.log | dmap 'from STATS select ...' if r.isInputFromPipe() { - dlog.Server.Debug("Reading data from pipe") - r.read(ctx, ltx, "PIPE", "PIPE", re) + dlog.Server.Debug("Reading data from stdin pipe") + // Empty file path and globID "-" represents reading from the stdin pipe. + r.read(ctx, ltx, "", "-", re) return } |
