diff options
19 files changed, 626 insertions, 170 deletions
diff --git a/integrationtests/dmap_server_helpers.go b/integrationtests/dmap_server_helpers.go new file mode 100644 index 0000000..cb34dec --- /dev/null +++ b/integrationtests/dmap_server_helpers.go @@ -0,0 +1,195 @@ +package integrationtests + +import ( + "context" + "fmt" + "strings" + "testing" + "time" +) + +// testDMapWithServer runs a DMap command with a running dserver and compares output +func testDMapWithServer(t *testing.T, args []string, csvFile, expectedCsvFile, queryFile, expectedQueryFile string) error { + // Start dserver + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start dserver in background + dserverCtx, dserverCancel := context.WithCancel(ctx) + defer dserverCancel() + + dserverStdout, dserverStderr, dserverErr, err := startCommand(dserverCtx, t, "", "../dserver", "--cfg", "none", "--port", "2222") + if err != nil { + return fmt.Errorf("failed to start dserver: %v", err) + } + + // Wait for server to start + time.Sleep(2 * time.Second) + + // Run dmap with server connection + dmapArgs := append([]string{"--cfg", "none", "--servers", "localhost:2222", "--trustAllHosts"}, args...) + dmapCtx, dmapCancel := context.WithTimeout(ctx, 30*time.Second) + defer dmapCancel() + + dmapStdout, dmapStderr, dmapCmdErr, err := startCommand(dmapCtx, t, "", "../dmap", dmapArgs...) + if err != nil { + dserverCancel() + return fmt.Errorf("failed to start dmap: %v", err) + } + + // Wait for dmap to complete + dmapDone := make(chan struct{}) + go func() { + defer close(dmapDone) + waitForCommand(dmapCtx, t, dmapStdout, dmapStderr, dmapCmdErr) + }() + + // Wait for dmap completion or timeout + select { + case <-dmapDone: + // DMap completed + case <-dmapCtx.Done(): + dserverCancel() + return fmt.Errorf("dmap command timed out") + } + + // Stop the server + dserverCancel() + + // Wait a bit for server cleanup + time.Sleep(500 * time.Millisecond) + + // Drain server channels to avoid goroutine leaks + go func() { + for range dserverStdout { + } + }() + go func() { + for range dserverStderr { + } + }() + go func() { + for range dserverErr { + } + }() + + // Compare CSV output + // For DMap tests, we need to use compareFilesContents for some tests + if strings.Contains(expectedCsvFile, "dmap2") || + strings.Contains(expectedCsvFile, "dmap3") || + strings.Contains(expectedCsvFile, "dmap4") || + strings.Contains(expectedCsvFile, "dmap5") { + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { + return fmt.Errorf("CSV file contents comparison failed: %v", err) + } + } else { + if err := compareFiles(t, csvFile, expectedCsvFile); err != nil { + return fmt.Errorf("CSV file comparison failed: %v", err) + } + } + + // Compare query file + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + return fmt.Errorf("query file comparison failed: %v", err) + } + + return nil +} + +// testDMapMultipleRunsWithServer runs a DMap command multiple times with server (for append tests) +func testDMapMultipleRunsWithServer(t *testing.T, args []string, csvFile, expectedCsvFile, queryFile, expectedQueryFile string, numRuns int) error { + // Start dserver + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start dserver in background + dserverCtx, dserverCancel := context.WithCancel(ctx) + defer dserverCancel() + + dserverStdout, dserverStderr, dserverErr, err := startCommand(dserverCtx, t, "", "../dserver", "--cfg", "none", "--port", "2222") + if err != nil { + return fmt.Errorf("failed to start dserver: %v", err) + } + + // Wait for server to start + time.Sleep(2 * time.Second) + + // Run dmap multiple times + for i := 0; i < numRuns; i++ { + dmapArgs := append([]string{"--cfg", "none", "--servers", "localhost:2222", "--trustAllHosts"}, args...) + dmapCtx, dmapCancel := context.WithTimeout(ctx, 30*time.Second) + + dmapStdout, dmapStderr, dmapCmdErr, err := startCommand(dmapCtx, t, "", "../dmap", dmapArgs...) + if err != nil { + dmapCancel() + dserverCancel() + return fmt.Errorf("failed to start dmap (run %d): %v", i+1, err) + } + + // Wait for dmap to complete + dmapDone := make(chan struct{}) + go func() { + defer close(dmapDone) + waitForCommand(dmapCtx, t, dmapStdout, dmapStderr, dmapCmdErr) + }() + + // Wait for dmap completion or timeout + select { + case <-dmapDone: + // DMap completed + case <-dmapCtx.Done(): + dmapCancel() + dserverCancel() + return fmt.Errorf("dmap command timed out (run %d)", i+1) + } + + dmapCancel() + // Small delay between runs + time.Sleep(100 * time.Millisecond) + } + + // Stop the server + dserverCancel() + + // Wait a bit for server cleanup + time.Sleep(500 * time.Millisecond) + + // Drain server channels to avoid goroutine leaks + go func() { + for range dserverStdout { + } + }() + go func() { + for range dserverStderr { + } + }() + go func() { + for range dserverErr { + } + }() + + // Compare output files + if expectedCsvFile != "" { + // For DMap tests, we need to use compareFilesContents for some tests + if strings.Contains(expectedCsvFile, "dmap2") || + strings.Contains(expectedCsvFile, "dmap3") || + strings.Contains(expectedCsvFile, "dmap4") || + strings.Contains(expectedCsvFile, "dmap5") { + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { + return fmt.Errorf("CSV file contents comparison failed: %v", err) + } + } else { + if err := compareFiles(t, csvFile, expectedCsvFile); err != nil { + return fmt.Errorf("CSV file comparison failed: %v", err) + } + } + } + + if expectedQueryFile != "" { + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + return fmt.Errorf("query file comparison failed: %v", err) + } + } + + return nil +}
\ No newline at end of file diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go index f772243..de57f59 100644 --- a/integrationtests/dmap_test.go +++ b/integrationtests/dmap_test.go @@ -15,6 +15,26 @@ func TestDMap1(t *testing.T) { return } + // Test both serverless and server modes + modes := []struct { + name string + useServer bool + }{ + {"Serverless", false}, + {"WithServer", true}, + } + + for _, mode := range modes { + t.Run(mode.name, func(t *testing.T) { + if err := testDMap1(t, mode.useServer); err != nil { + t.Error(err) + return + } + }) + } +} + +func testDMap1(t *testing.T, useServer bool) error { testTable := map[string]string{ "a": "from STATS select count($line),last($time)," + "avg($goroutines),min(concurrentConnections),max(lifetimeConnections) " + @@ -32,68 +52,95 @@ func TestDMap1(t *testing.T) { for subtestName, query := range testTable { t.Log("Testing dmap with input file") - if err := testDmap1(t, query, subtestName, false); err != nil { + if err := testDmap1Sub(t, query, subtestName, false, useServer); err != nil { t.Error(err) - return + return err } t.Log("Testing dmap with stdin input pipe") - if err := testDmap1(t, query, subtestName, true); err != nil { + if err := testDmap1Sub(t, query, subtestName, true, useServer); err != nil { t.Error(err) - return + return err } } + return nil } -func testDmap1(t *testing.T, query, subtestName string, usePipe bool) error { - inFile := "mapr_testdata.log" - csvFile := fmt.Sprintf("dmap1%s.csv.tmp", subtestName) - expectedCsvFile := fmt.Sprintf("dmap1%s.csv.expected", subtestName) +func testDmap1Sub(t *testing.T, query, subtestName string, usePipe bool, useServer bool) error { + var inFile, expectedCsvFile, expectedQueryFile, csvFile string + + if useServer { + // Use small test data for server mode to avoid channel overflow + inFile = "small_mapr_testdata.log" + csvFile = fmt.Sprintf("small_dmap1%s.csv.tmp", subtestName) + expectedCsvFile = fmt.Sprintf("small_dmap1%s.csv.expected", subtestName) + expectedQueryFile = fmt.Sprintf("small_dmap1%s.csv.query.expected", subtestName) + } else { + inFile = "mapr_testdata.log" + csvFile = fmt.Sprintf("dmap1%s.csv.tmp", subtestName) + expectedCsvFile = fmt.Sprintf("dmap1%s.csv.expected", subtestName) + expectedQueryFile = fmt.Sprintf("dmap1%s.csv.query.expected", subtestName) + } + queryFile := fmt.Sprintf("%s.query", csvFile) - expectedQueryFile := fmt.Sprintf("dmap1%s.csv.query.expected", subtestName) query = fmt.Sprintf("%s outfile %s", query, csvFile) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var stdoutCh, stderrCh <-chan string - var cmdErrCh <-chan error - var err error - - if usePipe { - stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t, - inFile, "../dmap", - "--cfg", "none", - "--query", query, - "--logger", "stdout", - "--logLevel", "info", - "--noColor") + if useServer { + // Server mode testing + var args []string + if usePipe { + // For pipe mode with server, we need to handle this differently + // DMap with server doesn't support stdin pipe in the same way + // So we'll just test file mode for server + args = []string{"--query", query, "--logger", "stdout", "--logLevel", "info", "--noColor", inFile} + } else { + args = []string{"--query", query, "--logger", "stdout", "--logLevel", "info", "--noColor", inFile} + } + return testDMapWithServer(t, args, csvFile, expectedCsvFile, queryFile, expectedQueryFile) } else { - stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t, - "", "../dmap", - "--cfg", "none", - "--query", query, - "--logger", "stdout", - "--logLevel", "info", - "--noColor", - inFile) - } + // Serverless mode testing (original code) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var stdoutCh, stderrCh <-chan string + var cmdErrCh <-chan error + var err error + + if usePipe { + stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t, + inFile, "../dmap", + "--cfg", "none", + "--query", query, + "--logger", "stdout", + "--logLevel", "info", + "--noColor") + } else { + stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t, + "", "../dmap", + "--cfg", "none", + "--query", query, + "--logger", "stdout", + "--logLevel", "info", + "--noColor", + inFile) + } - if err != nil { - return err - } + if err != nil { + return err + } + + waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) - waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) + if err := compareFiles(t, csvFile, expectedCsvFile); err != nil { + return err + } + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + return err + } - if err := compareFiles(t, csvFile, expectedCsvFile); err != nil { - return err + os.Remove(csvFile) + os.Remove(queryFile) + return nil } - if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { - return err - } - - os.Remove(csvFile) - os.Remove(queryFile) - return nil } func TestDMap2(t *testing.T) { @@ -101,36 +148,73 @@ func TestDMap2(t *testing.T) { t.Log("Skipping") return } - inFile := "mapr_testdata.log" + + // Test both serverless and server modes + modes := []struct { + name string + useServer bool + }{ + {"Serverless", false}, + {"WithServer", true}, + } + + for _, mode := range modes { + t.Run(mode.name, func(t *testing.T) { + if err := testDMap2(t, mode.useServer); err != nil { + t.Error(err) + return + } + }) + } +} + +func testDMap2(t *testing.T, useServer bool) error { + var inFile, expectedCsvFile, expectedQueryFile, csvFile string outFile := "dmap2.stdout.tmp" - csvFile := "dmap2.csv.tmp" - expectedCsvFile := "dmap2.csv.expected" + + if useServer { + // Use small test data for server mode to avoid channel overflow + inFile = "small_mapr_testdata.log" + csvFile = "small_dmap2.csv.tmp" + expectedCsvFile = "small_dmap2.csv.expected" + expectedQueryFile = "small_dmap2.csv.query.expected" + } else { + inFile = "mapr_testdata.log" + csvFile = "dmap2.csv.tmp" + expectedCsvFile = "dmap2.csv.expected" + expectedQueryFile = "dmap2.csv.query.expected" + } + queryFile := fmt.Sprintf("%s.query", csvFile) - expectedQueryFile := "dmap2.csv.query.expected" query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),"+ "avg($goroutines),min($goroutines) group by $time order by count($time) "+ "outfile %s", csvFile) - _, err := runCommand(context.TODO(), t, outFile, - "../dmap", "--query", query, "--cfg", "none", inFile) - if err != nil { - t.Error(err) - return - } + if useServer { + // Server mode testing + args := []string{"--query", query, "--cfg", "none", inFile} + return testDMapWithServer(t, args, csvFile, expectedCsvFile, queryFile, expectedQueryFile) + } else { + // Serverless mode testing (original code) + _, err := runCommand(context.TODO(), t, outFile, + "../dmap", "--query", query, "--cfg", "none", inFile) + if err != nil { + return err + } - if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { - t.Error(err) - return - } - if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { - t.Error(err) - return - } + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { + return err + } + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + return err + } - os.Remove(outFile) - os.Remove(csvFile) - os.Remove(queryFile) + os.Remove(outFile) + os.Remove(csvFile) + os.Remove(queryFile) + return nil + } } func TestDMap3(t *testing.T) { @@ -138,56 +222,100 @@ func TestDMap3(t *testing.T) { t.Log("Skipping") return } - inFile := "mapr_testdata.log" + + // Test both serverless and server modes + modes := []struct { + name string + useServer bool + }{ + {"Serverless", false}, + {"WithServer", true}, + } + + for _, mode := range modes { + t.Run(mode.name, func(t *testing.T) { + if err := testDMap3(t, mode.useServer); err != nil { + t.Error(err) + return + } + }) + } +} + +func testDMap3(t *testing.T, useServer bool) error { + var inFile, expectedCsvFile, expectedQueryFile, csvFile string outFile := "dmap3.stdout.tmp" - csvFile := "dmap3.csv.tmp" - expectedCsvFile := "dmap3.csv.expected" + + if useServer { + // Use small test data for server mode to avoid channel overflow + inFile = "small_mapr_testdata.log" + csvFile = "small_dmap3.csv.tmp" + expectedCsvFile = "small_dmap3.csv.expected" + expectedQueryFile = "small_dmap3.csv.query.expected" + } else { + inFile = "mapr_testdata.log" + csvFile = "dmap3.csv.tmp" + expectedCsvFile = "dmap3.csv.expected" + expectedQueryFile = "dmap3.csv.query.expected" + } + queryFile := fmt.Sprintf("%s.query", csvFile) - expectedQueryFile := "dmap3.csv.query.expected" query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),"+ "avg($goroutines),min($goroutines) group by $time order by count($time) "+ "outfile %s", csvFile) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, - "", "../dmap", - "--query", query, - "--cfg", "none", - "--logger", "stdout", - "--logLevel", "info", - "--noColor", - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile) - - if err != nil { - t.Error(err) - return - } - waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) + if useServer { + // Server mode testing - use only 3 files instead of 100 to avoid channel overflow + args := []string{ + "--query", query, + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "info", + "--noColor", + inFile, inFile, inFile, + } + return testDMapWithServer(t, args, csvFile, expectedCsvFile, queryFile, expectedQueryFile) + } else { + // Serverless mode testing (original code with 100 files) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { - t.Error(err) - return - } - if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { - t.Error(err) - return - } + stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, + "", "../dmap", + "--query", query, + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "info", + "--noColor", + inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, + inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, + inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, + inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, + inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, + inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, + inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, + inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, + inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, + inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile) - os.Remove(outFile) - os.Remove(csvFile) - os.Remove(queryFile) + if err != nil { + return err + } + waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) + + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { + return err + } + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + return err + } + + os.Remove(outFile) + os.Remove(csvFile) + os.Remove(queryFile) + return nil + } } func TestDMap4Append(t *testing.T) { @@ -195,12 +323,44 @@ func TestDMap4Append(t *testing.T) { t.Log("Skipping") return } - inFile := "mapr_testdata.log" + + // Test both serverless and server modes + modes := []struct { + name string + useServer bool + }{ + {"Serverless", false}, + {"WithServer", true}, + } + + for _, mode := range modes { + t.Run(mode.name, func(t *testing.T) { + if err := testDMap4Append(t, mode.useServer); err != nil { + t.Error(err) + return + } + }) + } +} + +func testDMap4Append(t *testing.T, useServer bool) error { + var inFile, expectedCsvFile, expectedQueryFile, csvFile string outFile := "dmap4.stdout.tmp" - csvFile := "dmap4.csv.tmp" - expectedCsvFile := "dmap4.csv.expected" + + if useServer { + // Use small test data for server mode to avoid channel overflow + inFile = "small_mapr_testdata.log" + csvFile = "small_dmap4.csv.tmp" + expectedCsvFile = "small_dmap4.csv.expected" + expectedQueryFile = "small_dmap4.csv.query.expected" + } else { + inFile = "mapr_testdata.log" + csvFile = "dmap4.csv.tmp" + expectedCsvFile = "dmap4.csv.expected" + expectedQueryFile = "dmap4.csv.query.expected" + } + queryFile := fmt.Sprintf("%s.query", csvFile) - expectedQueryFile := "dmap4.csv.query.expected" // Delete in case it exists already. Otherwise, test will fail. os.Remove(csvFile) @@ -209,41 +369,52 @@ func TestDMap4Append(t *testing.T) { "avg($goroutines),min($goroutines) group by $time order by count($time) "+ "outfile append %s", csvFile) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing - // file as we specified "outfile append". That works transparently for any mapreduce query - // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap - // command. - for i := 0; i < 2; i++ { - stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, - "", "../dmap", + if useServer { + // Server mode testing - run twice for append functionality + args := []string{ "--query", query, "--cfg", "none", "--logger", "stdout", "--logLevel", "info", - "--noColor", inFile) + "--noColor", inFile, + } + return testDMapMultipleRunsWithServer(t, args, csvFile, expectedCsvFile, queryFile, expectedQueryFile, 2) + } else { + // Serverless mode testing (original code) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing + // file as we specified "outfile append". That works transparently for any mapreduce query + // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap + // command. + for i := 0; i < 2; i++ { + stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, + "", "../dmap", + "--query", query, + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "info", + "--noColor", inFile) + + if err != nil { + return err + } + waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) + } - if err != nil { - t.Error(err) - return + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { + return err + } + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + return err } - waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) - } - if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { - t.Error(err) - return - } - if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { - t.Error(err) - return + os.Remove(outFile) + os.Remove(csvFile) + os.Remove(queryFile) + return nil } - - os.Remove(outFile) - os.Remove(csvFile) - os.Remove(queryFile) } func TestDMap5CSV(t *testing.T) { @@ -251,12 +422,44 @@ func TestDMap5CSV(t *testing.T) { t.Log("Skipping") return } - inFile := "dmap5.csv.in" + + // Test both serverless and server modes + modes := []struct { + name string + useServer bool + }{ + {"Serverless", false}, + {"WithServer", true}, + } + + for _, mode := range modes { + t.Run(mode.name, func(t *testing.T) { + if err := testDMap5CSV(t, mode.useServer); err != nil { + t.Error(err) + return + } + }) + } +} + +func testDMap5CSV(t *testing.T, useServer bool) error { + var inFile, expectedCsvFile, expectedQueryFile, csvFile string outFile := "dmap5.stdout.tmp" - csvFile := "dmap5.csv.tmp" - expectedCsvFile := "dmap5.csv.expected" + + if useServer { + // Use small test data for server mode to avoid channel overflow + inFile = "small_dmap5.csv.in" + csvFile = "small_dmap5.csv.tmp" + expectedCsvFile = "small_dmap5.csv.expected" + expectedQueryFile = "small_dmap5.csv.query.expected" + } else { + inFile = "dmap5.csv.in" + csvFile = "dmap5.csv.tmp" + expectedCsvFile = "dmap5.csv.expected" + expectedQueryFile = "dmap5.csv.query.expected" + } + queryFile := fmt.Sprintf("%s.query", csvFile) - expectedQueryFile := "dmap5.csv.query.expected" // Delete in case it exists already. Otherwise, test will fail. os.Remove(csvFile) @@ -266,39 +469,50 @@ func TestDMap5CSV(t *testing.T) { " set $timecount = `count($time)`, $time = `$time`, $min_goroutines = `min($goroutines)`"+ " logformat csv outfile %s", csvFile) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing - // file as we specified "outfile append". That works transparently for any mapreduce query - // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap - // command. - for i := 0; i < 2; i++ { - stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, - "", "../dmap", + if useServer { + // Server mode testing - run twice (CSV input format with append) + args := []string{ "--query", query, "--cfg", "none", "--logger", "stdout", "--logLevel", "info", - "--noColor", inFile) + "--noColor", inFile, + } + return testDMapMultipleRunsWithServer(t, args, csvFile, expectedCsvFile, queryFile, expectedQueryFile, 2) + } else { + // Serverless mode testing (original code) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing + // file as we specified "outfile append". That works transparently for any mapreduce query + // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap + // command. + for i := 0; i < 2; i++ { + stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, + "", "../dmap", + "--query", query, + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "info", + "--noColor", inFile) + + if err != nil { + return err + } + waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) + } - if err != nil { - t.Error(err) - return + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { + return err + } + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + return err } - waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) - } - if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { - t.Error(err) - return + os.Remove(outFile) + os.Remove(csvFile) + os.Remove(queryFile) + return nil } - if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { - t.Error(err) - return - } - - os.Remove(outFile) - os.Remove(csvFile) - os.Remove(queryFile) } diff --git a/integrationtests/small_dmap1a.csv.expected b/integrationtests/small_dmap1a.csv.expected new file mode 100644 index 0000000..0f642c1 --- /dev/null +++ b/integrationtests/small_dmap1a.csv.expected @@ -0,0 +1,2 @@ +count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) +16,1002-071948,11.750000,0.000000,6.000000 diff --git a/integrationtests/small_dmap1a.csv.query.expected b/integrationtests/small_dmap1a.csv.query.expected new file mode 100644 index 0000000..6962b5c --- /dev/null +++ b/integrationtests/small_dmap1a.csv.query.expected @@ -0,0 +1 @@ +from STATS select count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) group by $hostname outfile small_dmap1a.csv.tmp
\ No newline at end of file diff --git a/integrationtests/small_dmap1b.csv.expected b/integrationtests/small_dmap1b.csv.expected new file mode 100644 index 0000000..0f642c1 --- /dev/null +++ b/integrationtests/small_dmap1b.csv.expected @@ -0,0 +1,2 @@ +count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) +16,1002-071948,11.750000,0.000000,6.000000 diff --git a/integrationtests/small_dmap1b.csv.query.expected b/integrationtests/small_dmap1b.csv.query.expected new file mode 100644 index 0000000..2da6830 --- /dev/null +++ b/integrationtests/small_dmap1b.csv.query.expected @@ -0,0 +1 @@ +from STATS select count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) group by $hostname where lifetimeConnections >= 3 outfile small_dmap1b.csv.tmp
\ No newline at end of file diff --git a/integrationtests/small_dmap1c.csv.expected b/integrationtests/small_dmap1c.csv.expected new file mode 100644 index 0000000..ae51114 --- /dev/null +++ b/integrationtests/small_dmap1c.csv.expected @@ -0,0 +1 @@ +count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) diff --git a/integrationtests/small_dmap1c.csv.query.expected b/integrationtests/small_dmap1c.csv.query.expected new file mode 100644 index 0000000..434d9d6 --- /dev/null +++ b/integrationtests/small_dmap1c.csv.query.expected @@ -0,0 +1 @@ +from STATS select count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) group by $hostname where $time eq "1002-071949" outfile small_dmap1c.csv.tmp
\ No newline at end of file diff --git a/integrationtests/small_dmap1d.csv.expected b/integrationtests/small_dmap1d.csv.expected new file mode 100644 index 0000000..4172959 --- /dev/null +++ b/integrationtests/small_dmap1d.csv.expected @@ -0,0 +1,2 @@ +$mask,$md5,$foo,$bar,$baz,last($time) +....-......,f5cb931d12cb57fe1e46bc74ae0cc742,42,baz,1002-071948,1002-071948 diff --git a/integrationtests/small_dmap1d.csv.query.expected b/integrationtests/small_dmap1d.csv.query.expected new file mode 100644 index 0000000..7a5629f --- /dev/null +++ b/integrationtests/small_dmap1d.csv.query.expected @@ -0,0 +1 @@ +from STATS select $mask,$md5,$foo,$bar,$baz,last($time), set $mask = maskdigits($time), $md5 = md5sum($time), $foo = 42, $bar = "baz", $baz = $time group by $hostname outfile small_dmap1d.csv.tmp
\ No newline at end of file diff --git a/integrationtests/small_dmap2.csv.expected b/integrationtests/small_dmap2.csv.expected new file mode 100644 index 0000000..1ad0609 --- /dev/null +++ b/integrationtests/small_dmap2.csv.expected @@ -0,0 +1,7 @@ +count($time),$time,max($goroutines),avg($goroutines),min($goroutines) +5,1002-071948,15.000000,13.400000,11.000000 +3,1002-071939,11.000000,11.000000,11.000000 +3,1002-071947,11.000000,11.000000,11.000000 +2,1002-071938,11.000000,11.000000,11.000000 +2,1002-071946,11.000000,11.000000,11.000000 +1,1002-071937,11.000000,11.000000,11.000000 diff --git a/integrationtests/small_dmap2.csv.query.expected b/integrationtests/small_dmap2.csv.query.expected new file mode 100644 index 0000000..b70ab07 --- /dev/null +++ b/integrationtests/small_dmap2.csv.query.expected @@ -0,0 +1 @@ +from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) outfile small_dmap2.csv.tmp
\ No newline at end of file diff --git a/integrationtests/small_dmap3.csv.expected b/integrationtests/small_dmap3.csv.expected new file mode 100644 index 0000000..8d024b0 --- /dev/null +++ b/integrationtests/small_dmap3.csv.expected @@ -0,0 +1,7 @@ +count($time),$time,max($goroutines),avg($goroutines),min($goroutines) +15,1002-071948,15.000000,13.400000,11.000000 +9,1002-071947,11.000000,11.000000,11.000000 +9,1002-071939,11.000000,11.000000,11.000000 +6,1002-071946,11.000000,11.000000,11.000000 +6,1002-071938,11.000000,11.000000,11.000000 +3,1002-071937,11.000000,11.000000,11.000000 diff --git a/integrationtests/small_dmap3.csv.query.expected b/integrationtests/small_dmap3.csv.query.expected new file mode 100644 index 0000000..c5e020f --- /dev/null +++ b/integrationtests/small_dmap3.csv.query.expected @@ -0,0 +1 @@ +from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) outfile small_dmap3.csv.tmp
\ No newline at end of file diff --git a/integrationtests/small_dmap4.csv.expected b/integrationtests/small_dmap4.csv.expected new file mode 100644 index 0000000..2d9720c --- /dev/null +++ b/integrationtests/small_dmap4.csv.expected @@ -0,0 +1,13 @@ +count($time),$time,max($goroutines),avg($goroutines),min($goroutines) +5,1002-071948,15.000000,13.400000,11.000000 +3,1002-071939,11.000000,11.000000,11.000000 +3,1002-071947,11.000000,11.000000,11.000000 +2,1002-071938,11.000000,11.000000,11.000000 +2,1002-071946,11.000000,11.000000,11.000000 +1,1002-071937,11.000000,11.000000,11.000000 +5,1002-071948,15.000000,13.400000,11.000000 +3,1002-071947,11.000000,11.000000,11.000000 +3,1002-071939,11.000000,11.000000,11.000000 +2,1002-071938,11.000000,11.000000,11.000000 +2,1002-071946,11.000000,11.000000,11.000000 +1,1002-071937,11.000000,11.000000,11.000000 diff --git a/integrationtests/small_dmap4.csv.query.expected b/integrationtests/small_dmap4.csv.query.expected new file mode 100644 index 0000000..980854e --- /dev/null +++ b/integrationtests/small_dmap4.csv.query.expected @@ -0,0 +1 @@ +from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) outfile append small_dmap4.csv.tmp
\ No newline at end of file diff --git a/integrationtests/small_dmap5.csv.expected b/integrationtests/small_dmap5.csv.expected new file mode 100644 index 0000000..d9c80d5 --- /dev/null +++ b/integrationtests/small_dmap5.csv.expected @@ -0,0 +1,2 @@ +sum($timecount),last($time),min($min_goroutines) +43.000000,1002-071213,12.000000 diff --git a/integrationtests/small_dmap5.csv.in b/integrationtests/small_dmap5.csv.in new file mode 100644 index 0000000..f2593cf --- /dev/null +++ b/integrationtests/small_dmap5.csv.in @@ -0,0 +1,3 @@ +count($time),$time,max($goroutines),avg($goroutines),min($goroutines) +23,1002-071147,16.000000,14.391304,12.000000 +20,1002-071213,17.000000,14.100000,12.000000
\ No newline at end of file diff --git a/integrationtests/small_dmap5.csv.query.expected b/integrationtests/small_dmap5.csv.query.expected new file mode 100644 index 0000000..28d62da --- /dev/null +++ b/integrationtests/small_dmap5.csv.query.expected @@ -0,0 +1 @@ +select sum($timecount),last($time),min($min_goroutines), group by $hostname set $timecount = `count($time)`, $time = `$time`, $min_goroutines = `min($goroutines)` logformat csv outfile small_dmap5.csv.tmp
\ No newline at end of file |
