summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2023-09-05 17:17:38 +0300
committerPaul Buetow <pbuetow@mimecast.com>2023-09-07 15:32:31 +0300
commitb2840a87699fabbe9426ae7d673d174fea03b895 (patch)
tree84eb0938a73ceb99326ae140cf1897f5c35567fb
parent30cf519ef131fc2660f8c5f5e68f7485dd2b98ea (diff)
add mapr aggregration on CSV integr test
-rw-r--r--integrationtests/dmap5.csv.expected2
-rw-r--r--integrationtests/dmap5.csv.in4
-rw-r--r--integrationtests/dmap5.csv.query.expected1
-rw-r--r--integrationtests/dmap_test.go57
-rw-r--r--internal/mapr/logformat/csv.go53
-rw-r--r--internal/mapr/setcondition.go7
6 files changed, 124 insertions, 0 deletions
diff --git a/integrationtests/dmap5.csv.expected b/integrationtests/dmap5.csv.expected
new file mode 100644
index 0000000..1323a61
--- /dev/null
+++ b/integrationtests/dmap5.csv.expected
@@ -0,0 +1,2 @@
+sum($timecount),last($time),min($min_goroutines)
+63.000000,1002-071143,12.000000
diff --git a/integrationtests/dmap5.csv.in b/integrationtests/dmap5.csv.in
new file mode 100644
index 0000000..98c7763
--- /dev/null
+++ b/integrationtests/dmap5.csv.in
@@ -0,0 +1,4 @@
+count($time),$time,max($goroutines),avg($goroutines),min($goroutines)
+23,1002-071147,16.000000,14.391304,12.000000
+20,1002-071213,17.000000,14.100000,12.000000
+20,1002-071143,17.000000,15.000000,13.000000
diff --git a/integrationtests/dmap5.csv.query.expected b/integrationtests/dmap5.csv.query.expected
new file mode 100644
index 0000000..2d1723c
--- /dev/null
+++ b/integrationtests/dmap5.csv.query.expected
@@ -0,0 +1 @@
+select sum($timecount),last($time),min($min_goroutines), group by $hostname set $timecount = `count($time)`, $time = `$time`, $min_goroutines = `min($goroutines)` logformat csv outfile dmap5.csv.tmp \ No newline at end of file
diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go
index a378fb5..f772243 100644
--- a/integrationtests/dmap_test.go
+++ b/integrationtests/dmap_test.go
@@ -245,3 +245,60 @@ func TestDMap4Append(t *testing.T) {
os.Remove(csvFile)
os.Remove(queryFile)
}
+
+func TestDMap5CSV(t *testing.T) {
+ if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") {
+ t.Log("Skipping")
+ return
+ }
+ inFile := "dmap5.csv.in"
+ outFile := "dmap5.stdout.tmp"
+ csvFile := "dmap5.csv.tmp"
+ expectedCsvFile := "dmap5.csv.expected"
+ queryFile := fmt.Sprintf("%s.query", csvFile)
+ expectedQueryFile := "dmap5.csv.query.expected"
+
+ // Delete in case it exists already. Otherwise, test will fail.
+ os.Remove(csvFile)
+
+ query := fmt.Sprintf("select sum($timecount),last($time),min($min_goroutines),"+
+ " group by $hostname"+
+ " set $timecount = `count($time)`, $time = `$time`, $min_goroutines = `min($goroutines)`"+
+ " logformat csv outfile %s", csvFile)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing
+ // file as we specified "outfile append". That works transparently for any mapreduce query
+ // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap
+ // command.
+ for i := 0; i < 2; i++ {
+ stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
+ "", "../dmap",
+ "--query", query,
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--noColor", inFile)
+
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
+ }
+
+ if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
+ t.Error(err)
+ return
+ }
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ t.Error(err)
+ return
+ }
+
+ os.Remove(outFile)
+ os.Remove(csvFile)
+ os.Remove(queryFile)
+}
diff --git a/internal/mapr/logformat/csv.go b/internal/mapr/logformat/csv.go
new file mode 100644
index 0000000..ea85ca9
--- /dev/null
+++ b/internal/mapr/logformat/csv.go
@@ -0,0 +1,53 @@
+package logformat
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+type csvParser struct {
+ defaultParser
+ header []string
+ hasHeader bool
+}
+
+func newCSVParser(hostname, timeZoneName string, timeZoneOffset int) (*csvParser, error) {
+ defaultParser, err := newDefaultParser(hostname, timeZoneName, timeZoneOffset)
+ if err != nil {
+ return &csvParser{}, err
+ }
+ return &csvParser{defaultParser: *defaultParser}, nil
+}
+
+func (p *csvParser) MakeFields(maprLine string) (map[string]string, error) {
+ if !p.hasHeader {
+ p.parseHeader(maprLine)
+ return nil, ErrIgnoreFields
+ }
+
+ fields := make(map[string]string, 7+len(p.header))
+ fields["*"] = "*"
+ fields["$hostname"] = p.hostname
+ fields["$server"] = p.hostname
+ fields["$line"] = maprLine
+ fields["$empty"] = ""
+ fields["$timezone"] = p.timeZoneName
+ fields["$timeoffset"] = p.timeZoneOffset
+
+ splitted := strings.Split(maprLine, protocol.CSVDelimiter)
+ for i, value := range splitted {
+ if i >= len(p.header) {
+ return fields, fmt.Errorf("CSV file seems corrupted, more fields than header values?")
+ }
+ fields[p.header[i]] = value
+ }
+
+ return fields, nil
+}
+
+func (p *csvParser) parseHeader(maprLine string) {
+ p.header = strings.Split(maprLine, protocol.CSVDelimiter)
+ p.hasHeader = true
+}
diff --git a/internal/mapr/setcondition.go b/internal/mapr/setcondition.go
index 9dcd690..308a0f4 100644
--- a/internal/mapr/setcondition.go
+++ b/internal/mapr/setcondition.go
@@ -37,6 +37,13 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) {
return sc, nil, err
}
+ // Seems like a quoted string? E.g.: "set $foo = `count(bar)`"
+ // So don't interpret `count` as a function!
+ if tokens[2].quotesStripped {
+ sc.rType = Field
+ return sc, tokens[3:], nil
+ }
+
// Seems like a function call?
if strings.HasSuffix(sc.rString, ")") {
functionStack, functionArg, err := funcs.NewFunctionStack(tokens[2].str)