diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2024-03-06 10:59:03 +0000 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2024-03-06 10:59:03 +0000 |
| commit | 49444cd13d78e540a88ff00b3dde5124ef770abd (patch) | |
| tree | 1ff51310dc2e001cf8aaf71c763023bba9f6656e | |
| parent | a3e10757a52fa47a0608afd88986162ca5eb22cc (diff) | |
| parent | c8265ec16e086c988868d8cc3b7bd854d213be90 (diff) | |
Merge branch 'PS-34477-csv.tmp' into 'master'v4.3.2
only rename .csv.tmp to .csv when the final result was written
See merge request Storage/dtail!8
| -rw-r--r-- | internal/clients/maprclient.go | 14 | ||||
| -rw-r--r-- | internal/mapr/globalgroupset.go | 4 | ||||
| -rw-r--r-- | internal/mapr/groupsetresult.go | 8 |
3 files changed, 13 insertions, 13 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 2bc66a4..440cb91 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -101,7 +101,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i status = c.baseClient.Start(ctx, statsCh) if c.cumulative { dlog.Client.Debug("Received final mapreduce result") - c.reportResults() + c.reportResults(true) } return @@ -145,16 +145,16 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) { select { case <-time.After(c.query.Interval): dlog.Client.Debug("Gathering interim mapreduce result") - c.reportResults() + c.reportResults(false) case <-ctx.Done(): return } } } -func (c *MaprClient) reportResults() { +func (c *MaprClient) reportResults(finalResult bool) { if c.query.HasOutfile() { - c.writeResultsToOutfile() + c.writeResultsToOutfile(finalResult) return } c.printResults() @@ -208,14 +208,14 @@ func (c *MaprClient) printResults() { dlog.Client.Raw(fmt.Sprintf("%s\n", result)) } -func (c *MaprClient) writeResultsToOutfile() { +func (c *MaprClient) writeResultsToOutfile(finalResult bool) { if c.cumulative { - if err := c.globalGroup.WriteResult(c.query); err != nil { + if err := c.globalGroup.WriteResult(c.query, finalResult); err != nil { dlog.Client.FatalPanic(err) } return } - if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil { + if err := c.globalGroup.SwapOut().WriteResult(c.query, true); err != nil { dlog.Client.FatalPanic(err) } } diff --git a/internal/mapr/globalgroupset.go b/internal/mapr/globalgroupset.go index 2d7f10b..2b12898 100644 --- a/internal/mapr/globalgroupset.go +++ b/internal/mapr/globalgroupset.go @@ -79,10 +79,10 @@ func (g *GlobalGroupSet) SwapOut() *GroupSet { } // WriteResult writes the result of a mapreduce aggregation to an outfile. -func (g *GlobalGroupSet) WriteResult(query *Query) error { +func (g *GlobalGroupSet) WriteResult(query *Query, finalResult bool) error { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return g.GroupSet.WriteResult(query) + return g.GroupSet.WriteResult(query, finalResult) } // Result returns the result of the mapreduce aggregation as a string. diff --git a/internal/mapr/groupsetresult.go b/internal/mapr/groupsetresult.go index 58663b8..47bdab8 100644 --- a/internal/mapr/groupsetresult.go +++ b/internal/mapr/groupsetresult.go @@ -176,7 +176,7 @@ func (*GroupSet) writeQueryFile(query *Query) error { } // WriteResult writes the result to an CSV outfile. -func (g *GroupSet) WriteResult(query *Query) error { +func (g *GroupSet) WriteResult(query *Query, finalResult bool) error { if !query.HasOutfile() { return errors.New("No outfile specified") } @@ -204,7 +204,7 @@ func (g *GroupSet) WriteResult(query *Query) error { } defer fd.Close() - return g.resultWriteUnformatted(query, rows, fd, writeHeader) + return g.resultWriteUnformatted(query, rows, fd, writeHeader, finalResult) } func (g *GroupSet) getOutfileFD(query *Query) (*os.File, error) { @@ -218,7 +218,7 @@ func (g *GroupSet) getOutfileFD(query *Query) (*os.File, error) { return os.OpenFile(query.Outfile.FilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) } -func (g *GroupSet) resultWriteUnformatted(query *Query, rows []result, fd *os.File, writeHeader bool) error { +func (g *GroupSet) resultWriteUnformatted(query *Query, rows []result, fd *os.File, writeHeader, finalResult bool) error { lastColumn := len(query.Select) - 1 if writeHeader { @@ -248,7 +248,7 @@ func (g *GroupSet) resultWriteUnformatted(query *Query, rows []result, fd *os.Fi } } - if !query.Outfile.AppendMode { + if !query.Outfile.AppendMode && finalResult { tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile.FilePath) if err := os.Rename(tmpOutfile, query.Outfile.FilePath); err != nil { os.Remove(tmpOutfile) |
