summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2024-03-06 10:59:03 +0000
committerPaul Buetow <pbuetow@mimecast.com>2024-03-06 10:59:03 +0000
commit49444cd13d78e540a88ff00b3dde5124ef770abd (patch)
tree1ff51310dc2e001cf8aaf71c763023bba9f6656e
parenta3e10757a52fa47a0608afd88986162ca5eb22cc (diff)
parentc8265ec16e086c988868d8cc3b7bd854d213be90 (diff)
Merge branch 'PS-34477-csv.tmp' into 'master'v4.3.2
only rename .csv.tmp to .csv when the final result was written See merge request Storage/dtail!8
-rw-r--r--internal/clients/maprclient.go14
-rw-r--r--internal/mapr/globalgroupset.go4
-rw-r--r--internal/mapr/groupsetresult.go8
3 files changed, 13 insertions, 13 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 2bc66a4..440cb91 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -101,7 +101,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
status = c.baseClient.Start(ctx, statsCh)
if c.cumulative {
dlog.Client.Debug("Received final mapreduce result")
- c.reportResults()
+ c.reportResults(true)
}
return
@@ -145,16 +145,16 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) {
select {
case <-time.After(c.query.Interval):
dlog.Client.Debug("Gathering interim mapreduce result")
- c.reportResults()
+ c.reportResults(false)
case <-ctx.Done():
return
}
}
}
-func (c *MaprClient) reportResults() {
+func (c *MaprClient) reportResults(finalResult bool) {
if c.query.HasOutfile() {
- c.writeResultsToOutfile()
+ c.writeResultsToOutfile(finalResult)
return
}
c.printResults()
@@ -208,14 +208,14 @@ func (c *MaprClient) printResults() {
dlog.Client.Raw(fmt.Sprintf("%s\n", result))
}
-func (c *MaprClient) writeResultsToOutfile() {
+func (c *MaprClient) writeResultsToOutfile(finalResult bool) {
if c.cumulative {
- if err := c.globalGroup.WriteResult(c.query); err != nil {
+ if err := c.globalGroup.WriteResult(c.query, finalResult); err != nil {
dlog.Client.FatalPanic(err)
}
return
}
- if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil {
+ if err := c.globalGroup.SwapOut().WriteResult(c.query, true); err != nil {
dlog.Client.FatalPanic(err)
}
}
diff --git a/internal/mapr/globalgroupset.go b/internal/mapr/globalgroupset.go
index 2d7f10b..2b12898 100644
--- a/internal/mapr/globalgroupset.go
+++ b/internal/mapr/globalgroupset.go
@@ -79,10 +79,10 @@ func (g *GlobalGroupSet) SwapOut() *GroupSet {
}
// WriteResult writes the result of a mapreduce aggregation to an outfile.
-func (g *GlobalGroupSet) WriteResult(query *Query) error {
+func (g *GlobalGroupSet) WriteResult(query *Query, finalResult bool) error {
g.semaphore <- struct{}{}
defer func() { <-g.semaphore }()
- return g.GroupSet.WriteResult(query)
+ return g.GroupSet.WriteResult(query, finalResult)
}
// Result returns the result of the mapreduce aggregation as a string.
diff --git a/internal/mapr/groupsetresult.go b/internal/mapr/groupsetresult.go
index 58663b8..47bdab8 100644
--- a/internal/mapr/groupsetresult.go
+++ b/internal/mapr/groupsetresult.go
@@ -176,7 +176,7 @@ func (*GroupSet) writeQueryFile(query *Query) error {
}
// WriteResult writes the result to an CSV outfile.
-func (g *GroupSet) WriteResult(query *Query) error {
+func (g *GroupSet) WriteResult(query *Query, finalResult bool) error {
if !query.HasOutfile() {
return errors.New("No outfile specified")
}
@@ -204,7 +204,7 @@ func (g *GroupSet) WriteResult(query *Query) error {
}
defer fd.Close()
- return g.resultWriteUnformatted(query, rows, fd, writeHeader)
+ return g.resultWriteUnformatted(query, rows, fd, writeHeader, finalResult)
}
func (g *GroupSet) getOutfileFD(query *Query) (*os.File, error) {
@@ -218,7 +218,7 @@ func (g *GroupSet) getOutfileFD(query *Query) (*os.File, error) {
return os.OpenFile(query.Outfile.FilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
}
-func (g *GroupSet) resultWriteUnformatted(query *Query, rows []result, fd *os.File, writeHeader bool) error {
+func (g *GroupSet) resultWriteUnformatted(query *Query, rows []result, fd *os.File, writeHeader, finalResult bool) error {
lastColumn := len(query.Select) - 1
if writeHeader {
@@ -248,7 +248,7 @@ func (g *GroupSet) resultWriteUnformatted(query *Query, rows []result, fd *os.Fi
}
}
- if !query.Outfile.AppendMode {
+ if !query.Outfile.AppendMode && finalResult {
tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile.FilePath)
if err := os.Rename(tmpOutfile, query.Outfile.FilePath); err != nil {
os.Remove(tmpOutfile)