summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--integrationtests/dmap_server_helpers.go195
-rw-r--r--integrationtests/dmap_test.go554
-rw-r--r--integrationtests/small_dmap1a.csv.expected2
-rw-r--r--integrationtests/small_dmap1a.csv.query.expected1
-rw-r--r--integrationtests/small_dmap1b.csv.expected2
-rw-r--r--integrationtests/small_dmap1b.csv.query.expected1
-rw-r--r--integrationtests/small_dmap1c.csv.expected1
-rw-r--r--integrationtests/small_dmap1c.csv.query.expected1
-rw-r--r--integrationtests/small_dmap1d.csv.expected2
-rw-r--r--integrationtests/small_dmap1d.csv.query.expected1
-rw-r--r--integrationtests/small_dmap2.csv.expected7
-rw-r--r--integrationtests/small_dmap2.csv.query.expected1
-rw-r--r--integrationtests/small_dmap3.csv.expected7
-rw-r--r--integrationtests/small_dmap3.csv.query.expected1
-rw-r--r--integrationtests/small_dmap4.csv.expected13
-rw-r--r--integrationtests/small_dmap4.csv.query.expected1
-rw-r--r--integrationtests/small_dmap5.csv.expected2
-rw-r--r--integrationtests/small_dmap5.csv.in3
-rw-r--r--integrationtests/small_dmap5.csv.query.expected1
19 files changed, 626 insertions, 170 deletions
diff --git a/integrationtests/dmap_server_helpers.go b/integrationtests/dmap_server_helpers.go
new file mode 100644
index 0000000..cb34dec
--- /dev/null
+++ b/integrationtests/dmap_server_helpers.go
@@ -0,0 +1,195 @@
+package integrationtests
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "testing"
+ "time"
+)
+
+// testDMapWithServer runs a DMap command with a running dserver and compares output
+func testDMapWithServer(t *testing.T, args []string, csvFile, expectedCsvFile, queryFile, expectedQueryFile string) error {
+ // Start dserver
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Start dserver in background
+ dserverCtx, dserverCancel := context.WithCancel(ctx)
+ defer dserverCancel()
+
+ dserverStdout, dserverStderr, dserverErr, err := startCommand(dserverCtx, t, "", "../dserver", "--cfg", "none", "--port", "2222")
+ if err != nil {
+ return fmt.Errorf("failed to start dserver: %v", err)
+ }
+
+ // Wait for server to start
+ time.Sleep(2 * time.Second)
+
+ // Run dmap with server connection
+ dmapArgs := append([]string{"--cfg", "none", "--servers", "localhost:2222", "--trustAllHosts"}, args...)
+ dmapCtx, dmapCancel := context.WithTimeout(ctx, 30*time.Second)
+ defer dmapCancel()
+
+ dmapStdout, dmapStderr, dmapCmdErr, err := startCommand(dmapCtx, t, "", "../dmap", dmapArgs...)
+ if err != nil {
+ dserverCancel()
+ return fmt.Errorf("failed to start dmap: %v", err)
+ }
+
+ // Wait for dmap to complete
+ dmapDone := make(chan struct{})
+ go func() {
+ defer close(dmapDone)
+ waitForCommand(dmapCtx, t, dmapStdout, dmapStderr, dmapCmdErr)
+ }()
+
+ // Wait for dmap completion or timeout
+ select {
+ case <-dmapDone:
+ // DMap completed
+ case <-dmapCtx.Done():
+ dserverCancel()
+ return fmt.Errorf("dmap command timed out")
+ }
+
+ // Stop the server
+ dserverCancel()
+
+ // Wait a bit for server cleanup
+ time.Sleep(500 * time.Millisecond)
+
+ // Drain server channels to avoid goroutine leaks
+ go func() {
+ for range dserverStdout {
+ }
+ }()
+ go func() {
+ for range dserverStderr {
+ }
+ }()
+ go func() {
+ for range dserverErr {
+ }
+ }()
+
+ // Compare CSV output
+ // For DMap tests, we need to use compareFilesContents for some tests
+ if strings.Contains(expectedCsvFile, "dmap2") ||
+ strings.Contains(expectedCsvFile, "dmap3") ||
+ strings.Contains(expectedCsvFile, "dmap4") ||
+ strings.Contains(expectedCsvFile, "dmap5") {
+ if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
+ return fmt.Errorf("CSV file contents comparison failed: %v", err)
+ }
+ } else {
+ if err := compareFiles(t, csvFile, expectedCsvFile); err != nil {
+ return fmt.Errorf("CSV file comparison failed: %v", err)
+ }
+ }
+
+ // Compare query file
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ return fmt.Errorf("query file comparison failed: %v", err)
+ }
+
+ return nil
+}
+
+// testDMapMultipleRunsWithServer runs a DMap command multiple times with server (for append tests)
+func testDMapMultipleRunsWithServer(t *testing.T, args []string, csvFile, expectedCsvFile, queryFile, expectedQueryFile string, numRuns int) error {
+ // Start dserver
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Start dserver in background
+ dserverCtx, dserverCancel := context.WithCancel(ctx)
+ defer dserverCancel()
+
+ dserverStdout, dserverStderr, dserverErr, err := startCommand(dserverCtx, t, "", "../dserver", "--cfg", "none", "--port", "2222")
+ if err != nil {
+ return fmt.Errorf("failed to start dserver: %v", err)
+ }
+
+ // Wait for server to start
+ time.Sleep(2 * time.Second)
+
+ // Run dmap multiple times
+ for i := 0; i < numRuns; i++ {
+ dmapArgs := append([]string{"--cfg", "none", "--servers", "localhost:2222", "--trustAllHosts"}, args...)
+ dmapCtx, dmapCancel := context.WithTimeout(ctx, 30*time.Second)
+
+ dmapStdout, dmapStderr, dmapCmdErr, err := startCommand(dmapCtx, t, "", "../dmap", dmapArgs...)
+ if err != nil {
+ dmapCancel()
+ dserverCancel()
+ return fmt.Errorf("failed to start dmap (run %d): %v", i+1, err)
+ }
+
+ // Wait for dmap to complete
+ dmapDone := make(chan struct{})
+ go func() {
+ defer close(dmapDone)
+ waitForCommand(dmapCtx, t, dmapStdout, dmapStderr, dmapCmdErr)
+ }()
+
+ // Wait for dmap completion or timeout
+ select {
+ case <-dmapDone:
+ // DMap completed
+ case <-dmapCtx.Done():
+ dmapCancel()
+ dserverCancel()
+ return fmt.Errorf("dmap command timed out (run %d)", i+1)
+ }
+
+ dmapCancel()
+ // Small delay between runs
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ // Stop the server
+ dserverCancel()
+
+ // Wait a bit for server cleanup
+ time.Sleep(500 * time.Millisecond)
+
+ // Drain server channels to avoid goroutine leaks
+ go func() {
+ for range dserverStdout {
+ }
+ }()
+ go func() {
+ for range dserverStderr {
+ }
+ }()
+ go func() {
+ for range dserverErr {
+ }
+ }()
+
+ // Compare output files
+ if expectedCsvFile != "" {
+ // For DMap tests, we need to use compareFilesContents for some tests
+ if strings.Contains(expectedCsvFile, "dmap2") ||
+ strings.Contains(expectedCsvFile, "dmap3") ||
+ strings.Contains(expectedCsvFile, "dmap4") ||
+ strings.Contains(expectedCsvFile, "dmap5") {
+ if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
+ return fmt.Errorf("CSV file contents comparison failed: %v", err)
+ }
+ } else {
+ if err := compareFiles(t, csvFile, expectedCsvFile); err != nil {
+ return fmt.Errorf("CSV file comparison failed: %v", err)
+ }
+ }
+ }
+
+ if expectedQueryFile != "" {
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ return fmt.Errorf("query file comparison failed: %v", err)
+ }
+ }
+
+ return nil
+} \ No newline at end of file
diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go
index f772243..de57f59 100644
--- a/integrationtests/dmap_test.go
+++ b/integrationtests/dmap_test.go
@@ -15,6 +15,26 @@ func TestDMap1(t *testing.T) {
return
}
+ // Test both serverless and server modes
+ modes := []struct {
+ name string
+ useServer bool
+ }{
+ {"Serverless", false},
+ {"WithServer", true},
+ }
+
+ for _, mode := range modes {
+ t.Run(mode.name, func(t *testing.T) {
+ if err := testDMap1(t, mode.useServer); err != nil {
+ t.Error(err)
+ return
+ }
+ })
+ }
+}
+
+func testDMap1(t *testing.T, useServer bool) error {
testTable := map[string]string{
"a": "from STATS select count($line),last($time)," +
"avg($goroutines),min(concurrentConnections),max(lifetimeConnections) " +
@@ -32,68 +52,95 @@ func TestDMap1(t *testing.T) {
for subtestName, query := range testTable {
t.Log("Testing dmap with input file")
- if err := testDmap1(t, query, subtestName, false); err != nil {
+ if err := testDmap1Sub(t, query, subtestName, false, useServer); err != nil {
t.Error(err)
- return
+ return err
}
t.Log("Testing dmap with stdin input pipe")
- if err := testDmap1(t, query, subtestName, true); err != nil {
+ if err := testDmap1Sub(t, query, subtestName, true, useServer); err != nil {
t.Error(err)
- return
+ return err
}
}
+ return nil
}
-func testDmap1(t *testing.T, query, subtestName string, usePipe bool) error {
- inFile := "mapr_testdata.log"
- csvFile := fmt.Sprintf("dmap1%s.csv.tmp", subtestName)
- expectedCsvFile := fmt.Sprintf("dmap1%s.csv.expected", subtestName)
+func testDmap1Sub(t *testing.T, query, subtestName string, usePipe bool, useServer bool) error {
+ var inFile, expectedCsvFile, expectedQueryFile, csvFile string
+
+ if useServer {
+ // Use small test data for server mode to avoid channel overflow
+ inFile = "small_mapr_testdata.log"
+ csvFile = fmt.Sprintf("small_dmap1%s.csv.tmp", subtestName)
+ expectedCsvFile = fmt.Sprintf("small_dmap1%s.csv.expected", subtestName)
+ expectedQueryFile = fmt.Sprintf("small_dmap1%s.csv.query.expected", subtestName)
+ } else {
+ inFile = "mapr_testdata.log"
+ csvFile = fmt.Sprintf("dmap1%s.csv.tmp", subtestName)
+ expectedCsvFile = fmt.Sprintf("dmap1%s.csv.expected", subtestName)
+ expectedQueryFile = fmt.Sprintf("dmap1%s.csv.query.expected", subtestName)
+ }
+
queryFile := fmt.Sprintf("%s.query", csvFile)
- expectedQueryFile := fmt.Sprintf("dmap1%s.csv.query.expected", subtestName)
query = fmt.Sprintf("%s outfile %s", query, csvFile)
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- var stdoutCh, stderrCh <-chan string
- var cmdErrCh <-chan error
- var err error
-
- if usePipe {
- stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t,
- inFile, "../dmap",
- "--cfg", "none",
- "--query", query,
- "--logger", "stdout",
- "--logLevel", "info",
- "--noColor")
+ if useServer {
+ // Server mode testing
+ var args []string
+ if usePipe {
+ // For pipe mode with server, we need to handle this differently
+ // DMap with server doesn't support stdin pipe in the same way
+ // So we'll just test file mode for server
+ args = []string{"--query", query, "--logger", "stdout", "--logLevel", "info", "--noColor", inFile}
+ } else {
+ args = []string{"--query", query, "--logger", "stdout", "--logLevel", "info", "--noColor", inFile}
+ }
+ return testDMapWithServer(t, args, csvFile, expectedCsvFile, queryFile, expectedQueryFile)
} else {
- stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t,
- "", "../dmap",
- "--cfg", "none",
- "--query", query,
- "--logger", "stdout",
- "--logLevel", "info",
- "--noColor",
- inFile)
- }
+ // Serverless mode testing (original code)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ var stdoutCh, stderrCh <-chan string
+ var cmdErrCh <-chan error
+ var err error
+
+ if usePipe {
+ stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t,
+ inFile, "../dmap",
+ "--cfg", "none",
+ "--query", query,
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--noColor")
+ } else {
+ stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t,
+ "", "../dmap",
+ "--cfg", "none",
+ "--query", query,
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--noColor",
+ inFile)
+ }
- if err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
+
+ waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
- waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
+ if err := compareFiles(t, csvFile, expectedCsvFile); err != nil {
+ return err
+ }
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ return err
+ }
- if err := compareFiles(t, csvFile, expectedCsvFile); err != nil {
- return err
+ os.Remove(csvFile)
+ os.Remove(queryFile)
+ return nil
}
- if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
- return err
- }
-
- os.Remove(csvFile)
- os.Remove(queryFile)
- return nil
}
func TestDMap2(t *testing.T) {
@@ -101,36 +148,73 @@ func TestDMap2(t *testing.T) {
t.Log("Skipping")
return
}
- inFile := "mapr_testdata.log"
+
+ // Test both serverless and server modes
+ modes := []struct {
+ name string
+ useServer bool
+ }{
+ {"Serverless", false},
+ {"WithServer", true},
+ }
+
+ for _, mode := range modes {
+ t.Run(mode.name, func(t *testing.T) {
+ if err := testDMap2(t, mode.useServer); err != nil {
+ t.Error(err)
+ return
+ }
+ })
+ }
+}
+
+func testDMap2(t *testing.T, useServer bool) error {
+ var inFile, expectedCsvFile, expectedQueryFile, csvFile string
outFile := "dmap2.stdout.tmp"
- csvFile := "dmap2.csv.tmp"
- expectedCsvFile := "dmap2.csv.expected"
+
+ if useServer {
+ // Use small test data for server mode to avoid channel overflow
+ inFile = "small_mapr_testdata.log"
+ csvFile = "small_dmap2.csv.tmp"
+ expectedCsvFile = "small_dmap2.csv.expected"
+ expectedQueryFile = "small_dmap2.csv.query.expected"
+ } else {
+ inFile = "mapr_testdata.log"
+ csvFile = "dmap2.csv.tmp"
+ expectedCsvFile = "dmap2.csv.expected"
+ expectedQueryFile = "dmap2.csv.query.expected"
+ }
+
queryFile := fmt.Sprintf("%s.query", csvFile)
- expectedQueryFile := "dmap2.csv.query.expected"
query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),"+
"avg($goroutines),min($goroutines) group by $time order by count($time) "+
"outfile %s", csvFile)
- _, err := runCommand(context.TODO(), t, outFile,
- "../dmap", "--query", query, "--cfg", "none", inFile)
- if err != nil {
- t.Error(err)
- return
- }
+ if useServer {
+ // Server mode testing
+ args := []string{"--query", query, "--cfg", "none", inFile}
+ return testDMapWithServer(t, args, csvFile, expectedCsvFile, queryFile, expectedQueryFile)
+ } else {
+ // Serverless mode testing (original code)
+ _, err := runCommand(context.TODO(), t, outFile,
+ "../dmap", "--query", query, "--cfg", "none", inFile)
+ if err != nil {
+ return err
+ }
- if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
- t.Error(err)
- return
- }
- if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
- t.Error(err)
- return
- }
+ if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
+ return err
+ }
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ return err
+ }
- os.Remove(outFile)
- os.Remove(csvFile)
- os.Remove(queryFile)
+ os.Remove(outFile)
+ os.Remove(csvFile)
+ os.Remove(queryFile)
+ return nil
+ }
}
func TestDMap3(t *testing.T) {
@@ -138,56 +222,100 @@ func TestDMap3(t *testing.T) {
t.Log("Skipping")
return
}
- inFile := "mapr_testdata.log"
+
+ // Test both serverless and server modes
+ modes := []struct {
+ name string
+ useServer bool
+ }{
+ {"Serverless", false},
+ {"WithServer", true},
+ }
+
+ for _, mode := range modes {
+ t.Run(mode.name, func(t *testing.T) {
+ if err := testDMap3(t, mode.useServer); err != nil {
+ t.Error(err)
+ return
+ }
+ })
+ }
+}
+
+func testDMap3(t *testing.T, useServer bool) error {
+ var inFile, expectedCsvFile, expectedQueryFile, csvFile string
outFile := "dmap3.stdout.tmp"
- csvFile := "dmap3.csv.tmp"
- expectedCsvFile := "dmap3.csv.expected"
+
+ if useServer {
+ // Use small test data for server mode to avoid channel overflow
+ inFile = "small_mapr_testdata.log"
+ csvFile = "small_dmap3.csv.tmp"
+ expectedCsvFile = "small_dmap3.csv.expected"
+ expectedQueryFile = "small_dmap3.csv.query.expected"
+ } else {
+ inFile = "mapr_testdata.log"
+ csvFile = "dmap3.csv.tmp"
+ expectedCsvFile = "dmap3.csv.expected"
+ expectedQueryFile = "dmap3.csv.query.expected"
+ }
+
queryFile := fmt.Sprintf("%s.query", csvFile)
- expectedQueryFile := "dmap3.csv.query.expected"
query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),"+
"avg($goroutines),min($goroutines) group by $time order by count($time) "+
"outfile %s", csvFile)
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
- "", "../dmap",
- "--query", query,
- "--cfg", "none",
- "--logger", "stdout",
- "--logLevel", "info",
- "--noColor",
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile)
-
- if err != nil {
- t.Error(err)
- return
- }
- waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
+ if useServer {
+ // Server mode testing - use only 3 files instead of 100 to avoid channel overflow
+ args := []string{
+ "--query", query,
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--noColor",
+ inFile, inFile, inFile,
+ }
+ return testDMapWithServer(t, args, csvFile, expectedCsvFile, queryFile, expectedQueryFile)
+ } else {
+ // Serverless mode testing (original code with 100 files)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
- if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
- t.Error(err)
- return
- }
- if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
- t.Error(err)
- return
- }
+ stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
+ "", "../dmap",
+ "--query", query,
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--noColor",
+ inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
+ inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
+ inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
+ inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
+ inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
+ inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
+ inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
+ inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
+ inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
+ inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile)
- os.Remove(outFile)
- os.Remove(csvFile)
- os.Remove(queryFile)
+ if err != nil {
+ return err
+ }
+ waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
+
+ if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
+ return err
+ }
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ return err
+ }
+
+ os.Remove(outFile)
+ os.Remove(csvFile)
+ os.Remove(queryFile)
+ return nil
+ }
}
func TestDMap4Append(t *testing.T) {
@@ -195,12 +323,44 @@ func TestDMap4Append(t *testing.T) {
t.Log("Skipping")
return
}
- inFile := "mapr_testdata.log"
+
+ // Test both serverless and server modes
+ modes := []struct {
+ name string
+ useServer bool
+ }{
+ {"Serverless", false},
+ {"WithServer", true},
+ }
+
+ for _, mode := range modes {
+ t.Run(mode.name, func(t *testing.T) {
+ if err := testDMap4Append(t, mode.useServer); err != nil {
+ t.Error(err)
+ return
+ }
+ })
+ }
+}
+
+func testDMap4Append(t *testing.T, useServer bool) error {
+ var inFile, expectedCsvFile, expectedQueryFile, csvFile string
outFile := "dmap4.stdout.tmp"
- csvFile := "dmap4.csv.tmp"
- expectedCsvFile := "dmap4.csv.expected"
+
+ if useServer {
+ // Use small test data for server mode to avoid channel overflow
+ inFile = "small_mapr_testdata.log"
+ csvFile = "small_dmap4.csv.tmp"
+ expectedCsvFile = "small_dmap4.csv.expected"
+ expectedQueryFile = "small_dmap4.csv.query.expected"
+ } else {
+ inFile = "mapr_testdata.log"
+ csvFile = "dmap4.csv.tmp"
+ expectedCsvFile = "dmap4.csv.expected"
+ expectedQueryFile = "dmap4.csv.query.expected"
+ }
+
queryFile := fmt.Sprintf("%s.query", csvFile)
- expectedQueryFile := "dmap4.csv.query.expected"
// Delete in case it exists already. Otherwise, test will fail.
os.Remove(csvFile)
@@ -209,41 +369,52 @@ func TestDMap4Append(t *testing.T) {
"avg($goroutines),min($goroutines) group by $time order by count($time) "+
"outfile append %s", csvFile)
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing
- // file as we specified "outfile append". That works transparently for any mapreduce query
- // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap
- // command.
- for i := 0; i < 2; i++ {
- stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
- "", "../dmap",
+ if useServer {
+ // Server mode testing - run twice for append functionality
+ args := []string{
"--query", query,
"--cfg", "none",
"--logger", "stdout",
"--logLevel", "info",
- "--noColor", inFile)
+ "--noColor", inFile,
+ }
+ return testDMapMultipleRunsWithServer(t, args, csvFile, expectedCsvFile, queryFile, expectedQueryFile, 2)
+ } else {
+ // Serverless mode testing (original code)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing
+ // file as we specified "outfile append". That works transparently for any mapreduce query
+ // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap
+ // command.
+ for i := 0; i < 2; i++ {
+ stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
+ "", "../dmap",
+ "--query", query,
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--noColor", inFile)
+
+ if err != nil {
+ return err
+ }
+ waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
+ }
- if err != nil {
- t.Error(err)
- return
+ if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
+ return err
+ }
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ return err
}
- waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
- }
- if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
- t.Error(err)
- return
- }
- if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
- t.Error(err)
- return
+ os.Remove(outFile)
+ os.Remove(csvFile)
+ os.Remove(queryFile)
+ return nil
}
-
- os.Remove(outFile)
- os.Remove(csvFile)
- os.Remove(queryFile)
}
func TestDMap5CSV(t *testing.T) {
@@ -251,12 +422,44 @@ func TestDMap5CSV(t *testing.T) {
t.Log("Skipping")
return
}
- inFile := "dmap5.csv.in"
+
+ // Test both serverless and server modes
+ modes := []struct {
+ name string
+ useServer bool
+ }{
+ {"Serverless", false},
+ {"WithServer", true},
+ }
+
+ for _, mode := range modes {
+ t.Run(mode.name, func(t *testing.T) {
+ if err := testDMap5CSV(t, mode.useServer); err != nil {
+ t.Error(err)
+ return
+ }
+ })
+ }
+}
+
+func testDMap5CSV(t *testing.T, useServer bool) error {
+ var inFile, expectedCsvFile, expectedQueryFile, csvFile string
outFile := "dmap5.stdout.tmp"
- csvFile := "dmap5.csv.tmp"
- expectedCsvFile := "dmap5.csv.expected"
+
+ if useServer {
+ // Use small test data for server mode to avoid channel overflow
+ inFile = "small_dmap5.csv.in"
+ csvFile = "small_dmap5.csv.tmp"
+ expectedCsvFile = "small_dmap5.csv.expected"
+ expectedQueryFile = "small_dmap5.csv.query.expected"
+ } else {
+ inFile = "dmap5.csv.in"
+ csvFile = "dmap5.csv.tmp"
+ expectedCsvFile = "dmap5.csv.expected"
+ expectedQueryFile = "dmap5.csv.query.expected"
+ }
+
queryFile := fmt.Sprintf("%s.query", csvFile)
- expectedQueryFile := "dmap5.csv.query.expected"
// Delete in case it exists already. Otherwise, test will fail.
os.Remove(csvFile)
@@ -266,39 +469,50 @@ func TestDMap5CSV(t *testing.T) {
" set $timecount = `count($time)`, $time = `$time`, $min_goroutines = `min($goroutines)`"+
" logformat csv outfile %s", csvFile)
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing
- // file as we specified "outfile append". That works transparently for any mapreduce query
- // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap
- // command.
- for i := 0; i < 2; i++ {
- stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
- "", "../dmap",
+ if useServer {
+ // Server mode testing - run twice (CSV input format with append)
+ args := []string{
"--query", query,
"--cfg", "none",
"--logger", "stdout",
"--logLevel", "info",
- "--noColor", inFile)
+ "--noColor", inFile,
+ }
+ return testDMapMultipleRunsWithServer(t, args, csvFile, expectedCsvFile, queryFile, expectedQueryFile, 2)
+ } else {
+ // Serverless mode testing (original code)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing
+ // file as we specified "outfile append". That works transparently for any mapreduce query
+ // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap
+ // command.
+ for i := 0; i < 2; i++ {
+ stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
+ "", "../dmap",
+ "--query", query,
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--noColor", inFile)
+
+ if err != nil {
+ return err
+ }
+ waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
+ }
- if err != nil {
- t.Error(err)
- return
+ if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
+ return err
+ }
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ return err
}
- waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
- }
- if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
- t.Error(err)
- return
+ os.Remove(outFile)
+ os.Remove(csvFile)
+ os.Remove(queryFile)
+ return nil
}
- if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
- t.Error(err)
- return
- }
-
- os.Remove(outFile)
- os.Remove(csvFile)
- os.Remove(queryFile)
}
diff --git a/integrationtests/small_dmap1a.csv.expected b/integrationtests/small_dmap1a.csv.expected
new file mode 100644
index 0000000..0f642c1
--- /dev/null
+++ b/integrationtests/small_dmap1a.csv.expected
@@ -0,0 +1,2 @@
+count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections)
+16,1002-071948,11.750000,0.000000,6.000000
diff --git a/integrationtests/small_dmap1a.csv.query.expected b/integrationtests/small_dmap1a.csv.query.expected
new file mode 100644
index 0000000..6962b5c
--- /dev/null
+++ b/integrationtests/small_dmap1a.csv.query.expected
@@ -0,0 +1 @@
+from STATS select count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) group by $hostname outfile small_dmap1a.csv.tmp \ No newline at end of file
diff --git a/integrationtests/small_dmap1b.csv.expected b/integrationtests/small_dmap1b.csv.expected
new file mode 100644
index 0000000..0f642c1
--- /dev/null
+++ b/integrationtests/small_dmap1b.csv.expected
@@ -0,0 +1,2 @@
+count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections)
+16,1002-071948,11.750000,0.000000,6.000000
diff --git a/integrationtests/small_dmap1b.csv.query.expected b/integrationtests/small_dmap1b.csv.query.expected
new file mode 100644
index 0000000..2da6830
--- /dev/null
+++ b/integrationtests/small_dmap1b.csv.query.expected
@@ -0,0 +1 @@
+from STATS select count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) group by $hostname where lifetimeConnections >= 3 outfile small_dmap1b.csv.tmp \ No newline at end of file
diff --git a/integrationtests/small_dmap1c.csv.expected b/integrationtests/small_dmap1c.csv.expected
new file mode 100644
index 0000000..ae51114
--- /dev/null
+++ b/integrationtests/small_dmap1c.csv.expected
@@ -0,0 +1 @@
+count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections)
diff --git a/integrationtests/small_dmap1c.csv.query.expected b/integrationtests/small_dmap1c.csv.query.expected
new file mode 100644
index 0000000..434d9d6
--- /dev/null
+++ b/integrationtests/small_dmap1c.csv.query.expected
@@ -0,0 +1 @@
+from STATS select count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) group by $hostname where $time eq "1002-071949" outfile small_dmap1c.csv.tmp \ No newline at end of file
diff --git a/integrationtests/small_dmap1d.csv.expected b/integrationtests/small_dmap1d.csv.expected
new file mode 100644
index 0000000..4172959
--- /dev/null
+++ b/integrationtests/small_dmap1d.csv.expected
@@ -0,0 +1,2 @@
+$mask,$md5,$foo,$bar,$baz,last($time)
+....-......,f5cb931d12cb57fe1e46bc74ae0cc742,42,baz,1002-071948,1002-071948
diff --git a/integrationtests/small_dmap1d.csv.query.expected b/integrationtests/small_dmap1d.csv.query.expected
new file mode 100644
index 0000000..7a5629f
--- /dev/null
+++ b/integrationtests/small_dmap1d.csv.query.expected
@@ -0,0 +1 @@
+from STATS select $mask,$md5,$foo,$bar,$baz,last($time), set $mask = maskdigits($time), $md5 = md5sum($time), $foo = 42, $bar = "baz", $baz = $time group by $hostname outfile small_dmap1d.csv.tmp \ No newline at end of file
diff --git a/integrationtests/small_dmap2.csv.expected b/integrationtests/small_dmap2.csv.expected
new file mode 100644
index 0000000..1ad0609
--- /dev/null
+++ b/integrationtests/small_dmap2.csv.expected
@@ -0,0 +1,7 @@
+count($time),$time,max($goroutines),avg($goroutines),min($goroutines)
+5,1002-071948,15.000000,13.400000,11.000000
+3,1002-071939,11.000000,11.000000,11.000000
+3,1002-071947,11.000000,11.000000,11.000000
+2,1002-071938,11.000000,11.000000,11.000000
+2,1002-071946,11.000000,11.000000,11.000000
+1,1002-071937,11.000000,11.000000,11.000000
diff --git a/integrationtests/small_dmap2.csv.query.expected b/integrationtests/small_dmap2.csv.query.expected
new file mode 100644
index 0000000..b70ab07
--- /dev/null
+++ b/integrationtests/small_dmap2.csv.query.expected
@@ -0,0 +1 @@
+from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) outfile small_dmap2.csv.tmp \ No newline at end of file
diff --git a/integrationtests/small_dmap3.csv.expected b/integrationtests/small_dmap3.csv.expected
new file mode 100644
index 0000000..8d024b0
--- /dev/null
+++ b/integrationtests/small_dmap3.csv.expected
@@ -0,0 +1,7 @@
+count($time),$time,max($goroutines),avg($goroutines),min($goroutines)
+15,1002-071948,15.000000,13.400000,11.000000
+9,1002-071947,11.000000,11.000000,11.000000
+9,1002-071939,11.000000,11.000000,11.000000
+6,1002-071946,11.000000,11.000000,11.000000
+6,1002-071938,11.000000,11.000000,11.000000
+3,1002-071937,11.000000,11.000000,11.000000
diff --git a/integrationtests/small_dmap3.csv.query.expected b/integrationtests/small_dmap3.csv.query.expected
new file mode 100644
index 0000000..c5e020f
--- /dev/null
+++ b/integrationtests/small_dmap3.csv.query.expected
@@ -0,0 +1 @@
+from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) outfile small_dmap3.csv.tmp \ No newline at end of file
diff --git a/integrationtests/small_dmap4.csv.expected b/integrationtests/small_dmap4.csv.expected
new file mode 100644
index 0000000..2d9720c
--- /dev/null
+++ b/integrationtests/small_dmap4.csv.expected
@@ -0,0 +1,13 @@
+count($time),$time,max($goroutines),avg($goroutines),min($goroutines)
+5,1002-071948,15.000000,13.400000,11.000000
+3,1002-071939,11.000000,11.000000,11.000000
+3,1002-071947,11.000000,11.000000,11.000000
+2,1002-071938,11.000000,11.000000,11.000000
+2,1002-071946,11.000000,11.000000,11.000000
+1,1002-071937,11.000000,11.000000,11.000000
+5,1002-071948,15.000000,13.400000,11.000000
+3,1002-071947,11.000000,11.000000,11.000000
+3,1002-071939,11.000000,11.000000,11.000000
+2,1002-071938,11.000000,11.000000,11.000000
+2,1002-071946,11.000000,11.000000,11.000000
+1,1002-071937,11.000000,11.000000,11.000000
diff --git a/integrationtests/small_dmap4.csv.query.expected b/integrationtests/small_dmap4.csv.query.expected
new file mode 100644
index 0000000..980854e
--- /dev/null
+++ b/integrationtests/small_dmap4.csv.query.expected
@@ -0,0 +1 @@
+from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) outfile append small_dmap4.csv.tmp \ No newline at end of file
diff --git a/integrationtests/small_dmap5.csv.expected b/integrationtests/small_dmap5.csv.expected
new file mode 100644
index 0000000..d9c80d5
--- /dev/null
+++ b/integrationtests/small_dmap5.csv.expected
@@ -0,0 +1,2 @@
+sum($timecount),last($time),min($min_goroutines)
+43.000000,1002-071213,12.000000
diff --git a/integrationtests/small_dmap5.csv.in b/integrationtests/small_dmap5.csv.in
new file mode 100644
index 0000000..f2593cf
--- /dev/null
+++ b/integrationtests/small_dmap5.csv.in
@@ -0,0 +1,3 @@
+count($time),$time,max($goroutines),avg($goroutines),min($goroutines)
+23,1002-071147,16.000000,14.391304,12.000000
+20,1002-071213,17.000000,14.100000,12.000000 \ No newline at end of file
diff --git a/integrationtests/small_dmap5.csv.query.expected b/integrationtests/small_dmap5.csv.query.expected
new file mode 100644
index 0000000..28d62da
--- /dev/null
+++ b/integrationtests/small_dmap5.csv.query.expected
@@ -0,0 +1 @@
+select sum($timecount),last($time),min($min_goroutines), group by $hostname set $timecount = `count($time)`, $time = `$time`, $min_goroutines = `min($goroutines)` logformat csv outfile small_dmap5.csv.tmp \ No newline at end of file