diff options
| author | Paul Buetow <paul@buetow.org> | 2023-05-28 21:07:55 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2023-05-28 21:07:55 +0300 |
| commit | b1eea606aa22fef720b18017c25f850a5efbf059 (patch) | |
| tree | 593d4f1e1803dd7e1a4db20db2a1c1ec15fbea3c | |
| parent | 37fdae0b2a78a80729c8d8bfa69835f554b00120 (diff) | |
add fan out for client votes
| -rw-r--r-- | internal/client/client.go | 34 | ||||
| -rw-r--r-- | internal/quorum/quorum.go | 71 | ||||
| -rw-r--r-- | internal/quorum/quorum_test.go | 22 | ||||
| -rw-r--r-- | internal/run.go | 9 | ||||
| -rw-r--r-- | internal/server/server.go | 15 |
5 files changed, 112 insertions, 39 deletions
diff --git a/internal/client/client.go b/internal/client/client.go index 6c7fe79..76a61f0 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -8,11 +8,11 @@ import ( "codeberg.org/snonux/gorum/internal/config" ) -func Start(ctx context.Context, conf config.Config) { +func Start(ctx context.Context, conf config.Config, winnerCh <-chan string) { go func() { for { log.Println("Starting client") - start(ctx, conf) + start(ctx, conf, winnerCh) select { case <-time.After(time.Second): @@ -23,5 +23,33 @@ func Start(ctx context.Context, conf config.Config) { }() } -func start(ctx context.Context, conf config.Config) { +func start(ctx context.Context, conf config.Config, winnerCh <-chan string) { + fanOut := make([]chan string, len(conf.Participants)) + + for i := 0; i < len(fanOut); i++ { + fanOut[i] = make(chan string, 1) + } + + go func() { + defer func() { + for _, ch := range fanOut { + close(ch) + } + }() + + for { + select { + case winner := <-winnerCh: + for _, ch := range fanOut { + select { + case <-ch: + default: + } + ch <- winner + } + case <-ctx.Done(): + return + } + } + }() } diff --git a/internal/quorum/quorum.go b/internal/quorum/quorum.go index ea73cd0..505fb9f 100644 --- a/internal/quorum/quorum.go +++ b/internal/quorum/quorum.go @@ -1,38 +1,87 @@ package quorum import ( + "context" "fmt" "log" "sort" + "time" "codeberg.org/snonux/gorum/internal/config" "codeberg.org/snonux/gorum/internal/vote" ) -type Quorum map[string]vote.Vote +type Quorum struct { + conf config.Config + votes map[string]vote.Vote +} type Score struct { ID string Value int } +func New(conf config.Config) Quorum { + return Quorum{ + conf: conf, + votes: make(map[string]vote.Vote), + } +} + +func (q Quorum) Start(ctx context.Context) chan string { + ch := make(chan string, 1) + + go func() { + defer close(ch) + var lastWinner string + + for { + select { + case <-time.After(vote.Expiry): + q.deleteExpiredVotes() + winner, err := q.winner() + if err != nil { + log.Println(err) + continue + } + if lastWinner == winner { + continue + } + lastWinner = winner + + // Remove current channel entry, and update with new winner + select { + case <-ch: + default: + } + ch <- winner + + case <-ctx.Done(): + return + } + } + }() + + return ch +} + func (q Quorum) Vote(v vote.Vote) { log.Printf("Adding vote %v", v) - q[v.From] = v + q.votes[v.From] = v } -func (q Quorum) Winner(conf config.Config) (string, error) { - scores := q.Score(conf) +func (q Quorum) winner() (string, error) { + scores := q.score() if len(scores) == 0 { return "", fmt.Errorf("unable to find a winner, empty score list") } return scores[0].ID, nil } -func (q Quorum) Score(conf config.Config) (scores []Score) { +func (q Quorum) score() (scores []Score) { scoreMap := make(map[string]int) - for _, vote := range q { + for _, vote := range q.votes { for _, id := range vote.IDs { score, _ := scoreMap[id] scoreMap[id] = score + 1 @@ -49,24 +98,24 @@ func (q Quorum) Score(conf config.Config) (scores []Score) { } // Score tie, use participant number. - i_, _ := conf.ParticipantNumber(scores[i].ID) - j_, _ := conf.ParticipantNumber(scores[j].ID) + i_, _ := q.conf.ParticipantNumber(scores[i].ID) + j_, _ := q.conf.ParticipantNumber(scores[j].ID) return i_ < j_ }) return } -func (q Quorum) Expire() { +func (q Quorum) deleteExpiredVotes() { var expired []string - for from, vote := range q { + for from, vote := range q.votes { if vote.Expired() { expired = append(expired, from) } } for _, e := range expired { - delete(q, e) + delete(q.votes, e) } } diff --git a/internal/quorum/quorum_test.go b/internal/quorum/quorum_test.go index 4a12d02..8ab1898 100644 --- a/internal/quorum/quorum_test.go +++ b/internal/quorum/quorum_test.go @@ -9,8 +9,8 @@ import ( ) func TestScore(t *testing.T) { - quo := make(Quorum) conf := config.Config{Participants: []string{"foo:1234", "bar:4321", "baz:3444"}} + quo := New(conf) vote1 := vote.New(conf, "foo:334234", "foo bar\n") vote1.ExpiresAt = time.Now().Add(1 * time.Hour) @@ -28,7 +28,7 @@ func TestScore(t *testing.T) { vote4.ExpiresAt = time.Now().Add(1 * time.Hour) quo.Vote(vote4) - scores := quo.Score(conf) + scores := quo.score(conf) if len(scores) != 3 { t.Errorf("Expected scores to be of length 3: %v", scores) } @@ -54,12 +54,12 @@ func TestTieScore(t *testing.T) { } t.Run("First tie score test", func(t *testing.T) { - quo := make(Quorum) + quo := New() // If it is a tie, the first particpant (here: "foo") will win. conf := config.Config{Participants: []string{"foo:1234", "bar:4321", "baz:3444"}} addVotes(conf, quo) - scores := quo.Score(conf) + scores := quo.score(conf) if len(scores) != 3 { t.Errorf("Expected scores to be of length 3: %v", scores) @@ -68,19 +68,19 @@ func TestTieScore(t *testing.T) { t.Errorf("Expected score[0] to be {foo,3}: %v", scores[0]) } - winner, _ := quo.Winner(conf) + winner, _ := quo.winner(conf) if winner != "foo" { t.Errorf("Expected the winner to be foo but is: %s", winner) } }) t.Run("Second tie score test", func(t *testing.T) { - quo := make(Quorum) + quo := New() // If it is a tie, the first particpant (here: "bar") will win. conf := config.Config{Participants: []string{"bar:1234", "foo:4321", "baz:3444"}} addVotes(conf, quo) - scores := quo.Score(conf) + scores := quo.score(conf) if len(scores) != 3 { t.Errorf("Expected scores to be of length 3: %v", scores) @@ -89,7 +89,7 @@ func TestTieScore(t *testing.T) { t.Errorf("Expected score[0] to be {bar,3}: %v", scores[0]) } - winner, _ := quo.Winner(conf) + winner, _ := quo.winner(conf) if winner != "bar" { t.Errorf("Expected the winner to be bar but is: %s", winner) } @@ -97,7 +97,7 @@ func TestTieScore(t *testing.T) { } func TestExpire(t *testing.T) { - quo := make(Quorum) + quo := New() conf := config.Config{Participants: []string{"foo:1234", "bay:4321"}} vote1 := vote.New(conf, "foo:334234", " foo bar baz bay\n") @@ -108,12 +108,12 @@ func TestExpire(t *testing.T) { vote2.ExpiresAt = time.Now() quo.Vote(vote2) - if len(quo) != 2 { + if len(quo.votes) != 2 { t.Errorf("Expected to have two votes before expiry: %v", quo) } quo.Expire() - if len(quo) != 1 { + if len(quo.votes) != 1 { t.Errorf("Expected to have only one vote after expiry: %v", quo) } } diff --git a/internal/run.go b/internal/run.go index b66d40d..08fe8dc 100644 --- a/internal/run.go +++ b/internal/run.go @@ -5,6 +5,7 @@ import ( "codeberg.org/snonux/gorum/internal/client" "codeberg.org/snonux/gorum/internal/config" + "codeberg.org/snonux/gorum/internal/quorum" "codeberg.org/snonux/gorum/internal/server" ) @@ -14,7 +15,11 @@ func Run(ctx context.Context, configFile string) error { return err } - client.Start(ctx, conf) - server.Start(ctx, conf) + quo := quorum.New(conf) + winnerCh := quo.Start(ctx) + + server.Start(ctx, conf, quo) + client.Start(ctx, conf, winnerCh) + return nil } diff --git a/internal/server/server.go b/internal/server/server.go index 088e36c..dc1fd90 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -10,11 +10,11 @@ import ( "codeberg.org/snonux/gorum/internal/vote" ) -func Start(ctx context.Context, conf config.Config) { +func Start(ctx context.Context, conf config.Config, quo quorum.Quorum) { go func() { for { log.Println("Starting server") - if err := run(ctx, conf); err != nil { + if err := run(ctx, conf, quo); err != nil { log.Println(err) } @@ -27,26 +27,17 @@ func Start(ctx context.Context, conf config.Config) { }() } -func run(ctx context.Context, conf config.Config) error { +func run(ctx context.Context, conf config.Config, quo quorum.Quorum) error { serverCtx, cancel := context.WithCancel(ctx) defer cancel() ch := make(chan vote.Vote) - quo := make(quorum.Quorum) go func() { for { select { case vote := <-ch: quo.Vote(vote) - winner, err := quo.Winner(conf) - if err != nil { - log.Println(err.Error()) - continue - } - log.Printf("The current leader node is %s", winner) - case <-time.After(vote.Expiry): - quo.Expire() case <-serverCtx.Done(): return } |
