diff options
| author | Paul Buetow <paul@buetow.org> | 2023-09-05 17:17:38 +0300 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2023-09-07 15:32:31 +0300 |
| commit | b2840a87699fabbe9426ae7d673d174fea03b895 (patch) | |
| tree | 84eb0938a73ceb99326ae140cf1897f5c35567fb | |
| parent | 30cf519ef131fc2660f8c5f5e68f7485dd2b98ea (diff) | |
add mapr aggregration on CSV integr test
| -rw-r--r-- | integrationtests/dmap5.csv.expected | 2 | ||||
| -rw-r--r-- | integrationtests/dmap5.csv.in | 4 | ||||
| -rw-r--r-- | integrationtests/dmap5.csv.query.expected | 1 | ||||
| -rw-r--r-- | integrationtests/dmap_test.go | 57 | ||||
| -rw-r--r-- | internal/mapr/logformat/csv.go | 53 | ||||
| -rw-r--r-- | internal/mapr/setcondition.go | 7 |
6 files changed, 124 insertions, 0 deletions
diff --git a/integrationtests/dmap5.csv.expected b/integrationtests/dmap5.csv.expected new file mode 100644 index 0000000..1323a61 --- /dev/null +++ b/integrationtests/dmap5.csv.expected @@ -0,0 +1,2 @@ +sum($timecount),last($time),min($min_goroutines) +63.000000,1002-071143,12.000000 diff --git a/integrationtests/dmap5.csv.in b/integrationtests/dmap5.csv.in new file mode 100644 index 0000000..98c7763 --- /dev/null +++ b/integrationtests/dmap5.csv.in @@ -0,0 +1,4 @@ +count($time),$time,max($goroutines),avg($goroutines),min($goroutines) +23,1002-071147,16.000000,14.391304,12.000000 +20,1002-071213,17.000000,14.100000,12.000000 +20,1002-071143,17.000000,15.000000,13.000000 diff --git a/integrationtests/dmap5.csv.query.expected b/integrationtests/dmap5.csv.query.expected new file mode 100644 index 0000000..2d1723c --- /dev/null +++ b/integrationtests/dmap5.csv.query.expected @@ -0,0 +1 @@ +select sum($timecount),last($time),min($min_goroutines), group by $hostname set $timecount = `count($time)`, $time = `$time`, $min_goroutines = `min($goroutines)` logformat csv outfile dmap5.csv.tmp
\ No newline at end of file diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go index a378fb5..f772243 100644 --- a/integrationtests/dmap_test.go +++ b/integrationtests/dmap_test.go @@ -245,3 +245,60 @@ func TestDMap4Append(t *testing.T) { os.Remove(csvFile) os.Remove(queryFile) } + +func TestDMap5CSV(t *testing.T) { + if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") { + t.Log("Skipping") + return + } + inFile := "dmap5.csv.in" + outFile := "dmap5.stdout.tmp" + csvFile := "dmap5.csv.tmp" + expectedCsvFile := "dmap5.csv.expected" + queryFile := fmt.Sprintf("%s.query", csvFile) + expectedQueryFile := "dmap5.csv.query.expected" + + // Delete in case it exists already. Otherwise, test will fail. + os.Remove(csvFile) + + query := fmt.Sprintf("select sum($timecount),last($time),min($min_goroutines),"+ + " group by $hostname"+ + " set $timecount = `count($time)`, $time = `$time`, $min_goroutines = `min($goroutines)`"+ + " logformat csv outfile %s", csvFile) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing + // file as we specified "outfile append". That works transparently for any mapreduce query + // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap + // command. + for i := 0; i < 2; i++ { + stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, + "", "../dmap", + "--query", query, + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "info", + "--noColor", inFile) + + if err != nil { + t.Error(err) + return + } + waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) + } + + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { + t.Error(err) + return + } + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + t.Error(err) + return + } + + os.Remove(outFile) + os.Remove(csvFile) + os.Remove(queryFile) +} diff --git a/internal/mapr/logformat/csv.go b/internal/mapr/logformat/csv.go new file mode 100644 index 0000000..ea85ca9 --- /dev/null +++ b/internal/mapr/logformat/csv.go @@ -0,0 +1,53 @@ +package logformat + +import ( + "fmt" + "strings" + + "github.com/mimecast/dtail/internal/protocol" +) + +type csvParser struct { + defaultParser + header []string + hasHeader bool +} + +func newCSVParser(hostname, timeZoneName string, timeZoneOffset int) (*csvParser, error) { + defaultParser, err := newDefaultParser(hostname, timeZoneName, timeZoneOffset) + if err != nil { + return &csvParser{}, err + } + return &csvParser{defaultParser: *defaultParser}, nil +} + +func (p *csvParser) MakeFields(maprLine string) (map[string]string, error) { + if !p.hasHeader { + p.parseHeader(maprLine) + return nil, ErrIgnoreFields + } + + fields := make(map[string]string, 7+len(p.header)) + fields["*"] = "*" + fields["$hostname"] = p.hostname + fields["$server"] = p.hostname + fields["$line"] = maprLine + fields["$empty"] = "" + fields["$timezone"] = p.timeZoneName + fields["$timeoffset"] = p.timeZoneOffset + + splitted := strings.Split(maprLine, protocol.CSVDelimiter) + for i, value := range splitted { + if i >= len(p.header) { + return fields, fmt.Errorf("CSV file seems corrupted, more fields than header values?") + } + fields[p.header[i]] = value + } + + return fields, nil +} + +func (p *csvParser) parseHeader(maprLine string) { + p.header = strings.Split(maprLine, protocol.CSVDelimiter) + p.hasHeader = true +} diff --git a/internal/mapr/setcondition.go b/internal/mapr/setcondition.go index 9dcd690..308a0f4 100644 --- a/internal/mapr/setcondition.go +++ b/internal/mapr/setcondition.go @@ -37,6 +37,13 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) { return sc, nil, err } + // Seems like a quoted string? E.g.: "set $foo = `count(bar)`" + // So don't interpret `count` as a function! + if tokens[2].quotesStripped { + sc.rType = Field + return sc, tokens[3:], nil + } + // Seems like a function call? if strings.HasSuffix(sc.rString, ")") { functionStack, functionArg, err := funcs.NewFunctionStack(tokens[2].str) |
