From 5978480b49c152e458055bb3a6b1b4ba9afa54e9 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Thu, 29 Jun 2023 09:55:23 +0300 Subject: add email-notification support --- internal/quorum/notify.go | 12 +++---- internal/quorum/quorum.go | 73 ++++++++++++++++++++++++++---------------- internal/quorum/quorum_test.go | 2 +- 3 files changed, 52 insertions(+), 35 deletions(-) (limited to 'internal') diff --git a/internal/quorum/notify.go b/internal/quorum/notify.go index b6e6b5d..7761b4d 100644 --- a/internal/quorum/notify.go +++ b/internal/quorum/notify.go @@ -17,7 +17,7 @@ func notify(conf config.Config, subject, body string) error { return nil } - log.Println("notify", subject, body) + log.Println("notify:", subject, body) headers := map[string]string{ "From": conf.EmailFrom, @@ -40,11 +40,9 @@ func notify(conf config.Config, subject, body string) error { } func notifyError(conf config.Config, err error) { - if !notifyEnabled(conf) { - return - } - - if err := notify(conf, fmt.Sprintf("GORUM: An error occured: %v", err), err.Error()); err != nil { - log.Println("error: ", err) + if notifyEnabled(conf) { + if err := notify(conf, fmt.Sprintf("GORUM: An error occured: %v", err), err.Error()); err != nil { + log.Println("error:", err) + } } } diff --git a/internal/quorum/quorum.go b/internal/quorum/quorum.go index d33db56..bbdc640 100644 --- a/internal/quorum/quorum.go +++ b/internal/quorum/quorum.go @@ -42,7 +42,6 @@ func (quo Quorum) Start(ctx context.Context) <-chan vote.Vote { interval := time.Second * time.Duration(quo.conf.LoopIntervalS) if vote.Expiry <= interval { - // TODO: When to use log.Fatal and when to use panic? log.Fatal("quorum: LoopIntervalS ", quo.conf.LoopIntervalS, " should be less than the vote expiry of ", vote.Expiry) } @@ -50,23 +49,37 @@ func (quo Quorum) Start(ctx context.Context) <-chan vote.Vote { go func() { defer close(ch) + var ( + myVote vote.Vote + changed bool + ) + for { select { case <-time.After(interval): - myVote, _ := quo.makeMyVote() + myVote, _ = quo.makeMyVote() log.Println("quorum: made my vote:", myVote) ch <- myVote case v := <-quo.voteCh: quo.vote(v) - if myVote, changed := quo.makeMyVote(); changed { + if myVote, changed = quo.makeMyVote(); changed { log.Println("quorum: changed my vote:", myVote) ch <- myVote } case <-ctx.Done(): return } - if err := quo.persist(); err != nil { + + scoresStr, err := quo.persist() + if err != nil { log.Println("quorum:", err) + notifyError(quo.conf, err) + continue + } + if changed { + if err := notify(quo.conf, "GORUM: Quorum changed", scoresStr); err != nil { + log.Println("quorum:", err) + } } } }() @@ -109,26 +122,10 @@ func (quo Quorum) scores() (scores []Score) { return } -func (quo *Quorum) persist() error { +func (quo *Quorum) str() string { scores := quo.scores() log.Println("quorum scores:", scores) - if _, err := os.Stat(quo.conf.StateDir); os.IsNotExist(err) { - if err := os.MkdirAll(quo.conf.StateDir, 0755); err != nil { - return err - } - } - - stateFile := fmt.Sprintf("%s/scores", quo.conf.StateDir) - stateTmpFile := fmt.Sprintf("%s.tmp", stateFile) - - // TODO: Also create a "is_leader" marker file in the fs! - fd, err := os.Create(stateTmpFile) - if err != nil { - return err - } - defer fd.Close() - var sb strings.Builder for i, score := range scores { sb.WriteString("At position ") @@ -150,18 +147,40 @@ func (quo *Quorum) persist() error { sb.WriteString("\n") } - if _, err := fd.WriteString(sb.String()); err != nil { - return err + return sb.String() +} + +func (quo *Quorum) persist() (string, error) { + scoresStr := quo.str() + + if _, err := os.Stat(quo.conf.StateDir); os.IsNotExist(err) { + if err := os.MkdirAll(quo.conf.StateDir, 0755); err != nil { + return scoresStr, err + } + } + + stateFile := fmt.Sprintf("%s/scores", quo.conf.StateDir) + stateTmpFile := fmt.Sprintf("%s.tmp", stateFile) + + // TODO: Also create a "is_leader" marker file in the fs! + fd, err := os.Create(stateTmpFile) + if err != nil { + return scoresStr, err + } + defer fd.Close() + + if _, err := fd.WriteString(scoresStr); err != nil { + return scoresStr, err } if err := fd.Sync(); err != nil { - return err + return scoresStr, err } - return os.Rename(stateTmpFile, stateFile) + return scoresStr, os.Rename(stateTmpFile, stateFile) } func (quo *Quorum) makeMyVote() (vote.Vote, bool) { - newVote, err := quo.pruneVotes() + newVote, err := quo.expireOldVotes() if err != nil { log.Println("quorum:", err) return quo.myVote, false @@ -175,7 +194,7 @@ func (quo *Quorum) makeMyVote() (vote.Vote, bool) { return quo.myVote, true } -func (quo Quorum) pruneVotes() (vote.Vote, error) { +func (quo Quorum) expireOldVotes() (vote.Vote, error) { var expired []string var live []string diff --git a/internal/quorum/quorum_test.go b/internal/quorum/quorum_test.go index fd51fb6..3ae87c6 100644 --- a/internal/quorum/quorum_test.go +++ b/internal/quorum/quorum_test.go @@ -134,7 +134,7 @@ func TestExpire(t *testing.T) { t.Errorf("Expected to have two votes before expiry: %v", quo) } - newVote, _ := quo.pruneVotes() + newVote, _ := quo.expireOldVotes() if len(quo.votes) != 1 { t.Errorf("Expected to have one vote after expiry: %v", quo) } -- cgit v1.2.3