summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2023-05-28 21:07:55 +0300
committerPaul Buetow <paul@buetow.org>2023-05-28 21:07:55 +0300
commitb1eea606aa22fef720b18017c25f850a5efbf059 (patch)
tree593d4f1e1803dd7e1a4db20db2a1c1ec15fbea3c
parent37fdae0b2a78a80729c8d8bfa69835f554b00120 (diff)
add fan out for client votes
-rw-r--r--internal/client/client.go34
-rw-r--r--internal/quorum/quorum.go71
-rw-r--r--internal/quorum/quorum_test.go22
-rw-r--r--internal/run.go9
-rw-r--r--internal/server/server.go15
5 files changed, 112 insertions, 39 deletions
diff --git a/internal/client/client.go b/internal/client/client.go
index 6c7fe79..76a61f0 100644
--- a/internal/client/client.go
+++ b/internal/client/client.go
@@ -8,11 +8,11 @@ import (
"codeberg.org/snonux/gorum/internal/config"
)
-func Start(ctx context.Context, conf config.Config) {
+func Start(ctx context.Context, conf config.Config, winnerCh <-chan string) {
go func() {
for {
log.Println("Starting client")
- start(ctx, conf)
+ start(ctx, conf, winnerCh)
select {
case <-time.After(time.Second):
@@ -23,5 +23,33 @@ func Start(ctx context.Context, conf config.Config) {
}()
}
-func start(ctx context.Context, conf config.Config) {
+func start(ctx context.Context, conf config.Config, winnerCh <-chan string) {
+ fanOut := make([]chan string, len(conf.Participants))
+
+ for i := 0; i < len(fanOut); i++ {
+ fanOut[i] = make(chan string, 1)
+ }
+
+ go func() {
+ defer func() {
+ for _, ch := range fanOut {
+ close(ch)
+ }
+ }()
+
+ for {
+ select {
+ case winner := <-winnerCh:
+ for _, ch := range fanOut {
+ select {
+ case <-ch:
+ default:
+ }
+ ch <- winner
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
}
diff --git a/internal/quorum/quorum.go b/internal/quorum/quorum.go
index ea73cd0..505fb9f 100644
--- a/internal/quorum/quorum.go
+++ b/internal/quorum/quorum.go
@@ -1,38 +1,87 @@
package quorum
import (
+ "context"
"fmt"
"log"
"sort"
+ "time"
"codeberg.org/snonux/gorum/internal/config"
"codeberg.org/snonux/gorum/internal/vote"
)
-type Quorum map[string]vote.Vote
+type Quorum struct {
+ conf config.Config
+ votes map[string]vote.Vote
+}
type Score struct {
ID string
Value int
}
+func New(conf config.Config) Quorum {
+ return Quorum{
+ conf: conf,
+ votes: make(map[string]vote.Vote),
+ }
+}
+
+func (q Quorum) Start(ctx context.Context) chan string {
+ ch := make(chan string, 1)
+
+ go func() {
+ defer close(ch)
+ var lastWinner string
+
+ for {
+ select {
+ case <-time.After(vote.Expiry):
+ q.deleteExpiredVotes()
+ winner, err := q.winner()
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ if lastWinner == winner {
+ continue
+ }
+ lastWinner = winner
+
+ // Remove current channel entry, and update with new winner
+ select {
+ case <-ch:
+ default:
+ }
+ ch <- winner
+
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return ch
+}
+
func (q Quorum) Vote(v vote.Vote) {
log.Printf("Adding vote %v", v)
- q[v.From] = v
+ q.votes[v.From] = v
}
-func (q Quorum) Winner(conf config.Config) (string, error) {
- scores := q.Score(conf)
+func (q Quorum) winner() (string, error) {
+ scores := q.score()
if len(scores) == 0 {
return "", fmt.Errorf("unable to find a winner, empty score list")
}
return scores[0].ID, nil
}
-func (q Quorum) Score(conf config.Config) (scores []Score) {
+func (q Quorum) score() (scores []Score) {
scoreMap := make(map[string]int)
- for _, vote := range q {
+ for _, vote := range q.votes {
for _, id := range vote.IDs {
score, _ := scoreMap[id]
scoreMap[id] = score + 1
@@ -49,24 +98,24 @@ func (q Quorum) Score(conf config.Config) (scores []Score) {
}
// Score tie, use participant number.
- i_, _ := conf.ParticipantNumber(scores[i].ID)
- j_, _ := conf.ParticipantNumber(scores[j].ID)
+ i_, _ := q.conf.ParticipantNumber(scores[i].ID)
+ j_, _ := q.conf.ParticipantNumber(scores[j].ID)
return i_ < j_
})
return
}
-func (q Quorum) Expire() {
+func (q Quorum) deleteExpiredVotes() {
var expired []string
- for from, vote := range q {
+ for from, vote := range q.votes {
if vote.Expired() {
expired = append(expired, from)
}
}
for _, e := range expired {
- delete(q, e)
+ delete(q.votes, e)
}
}
diff --git a/internal/quorum/quorum_test.go b/internal/quorum/quorum_test.go
index 4a12d02..8ab1898 100644
--- a/internal/quorum/quorum_test.go
+++ b/internal/quorum/quorum_test.go
@@ -9,8 +9,8 @@ import (
)
func TestScore(t *testing.T) {
- quo := make(Quorum)
conf := config.Config{Participants: []string{"foo:1234", "bar:4321", "baz:3444"}}
+ quo := New(conf)
vote1 := vote.New(conf, "foo:334234", "foo bar\n")
vote1.ExpiresAt = time.Now().Add(1 * time.Hour)
@@ -28,7 +28,7 @@ func TestScore(t *testing.T) {
vote4.ExpiresAt = time.Now().Add(1 * time.Hour)
quo.Vote(vote4)
- scores := quo.Score(conf)
+ scores := quo.score(conf)
if len(scores) != 3 {
t.Errorf("Expected scores to be of length 3: %v", scores)
}
@@ -54,12 +54,12 @@ func TestTieScore(t *testing.T) {
}
t.Run("First tie score test", func(t *testing.T) {
- quo := make(Quorum)
+ quo := New()
// If it is a tie, the first particpant (here: "foo") will win.
conf := config.Config{Participants: []string{"foo:1234", "bar:4321", "baz:3444"}}
addVotes(conf, quo)
- scores := quo.Score(conf)
+ scores := quo.score(conf)
if len(scores) != 3 {
t.Errorf("Expected scores to be of length 3: %v", scores)
@@ -68,19 +68,19 @@ func TestTieScore(t *testing.T) {
t.Errorf("Expected score[0] to be {foo,3}: %v", scores[0])
}
- winner, _ := quo.Winner(conf)
+ winner, _ := quo.winner(conf)
if winner != "foo" {
t.Errorf("Expected the winner to be foo but is: %s", winner)
}
})
t.Run("Second tie score test", func(t *testing.T) {
- quo := make(Quorum)
+ quo := New()
// If it is a tie, the first particpant (here: "bar") will win.
conf := config.Config{Participants: []string{"bar:1234", "foo:4321", "baz:3444"}}
addVotes(conf, quo)
- scores := quo.Score(conf)
+ scores := quo.score(conf)
if len(scores) != 3 {
t.Errorf("Expected scores to be of length 3: %v", scores)
@@ -89,7 +89,7 @@ func TestTieScore(t *testing.T) {
t.Errorf("Expected score[0] to be {bar,3}: %v", scores[0])
}
- winner, _ := quo.Winner(conf)
+ winner, _ := quo.winner(conf)
if winner != "bar" {
t.Errorf("Expected the winner to be bar but is: %s", winner)
}
@@ -97,7 +97,7 @@ func TestTieScore(t *testing.T) {
}
func TestExpire(t *testing.T) {
- quo := make(Quorum)
+ quo := New()
conf := config.Config{Participants: []string{"foo:1234", "bay:4321"}}
vote1 := vote.New(conf, "foo:334234", " foo bar baz bay\n")
@@ -108,12 +108,12 @@ func TestExpire(t *testing.T) {
vote2.ExpiresAt = time.Now()
quo.Vote(vote2)
- if len(quo) != 2 {
+ if len(quo.votes) != 2 {
t.Errorf("Expected to have two votes before expiry: %v", quo)
}
quo.Expire()
- if len(quo) != 1 {
+ if len(quo.votes) != 1 {
t.Errorf("Expected to have only one vote after expiry: %v", quo)
}
}
diff --git a/internal/run.go b/internal/run.go
index b66d40d..08fe8dc 100644
--- a/internal/run.go
+++ b/internal/run.go
@@ -5,6 +5,7 @@ import (
"codeberg.org/snonux/gorum/internal/client"
"codeberg.org/snonux/gorum/internal/config"
+ "codeberg.org/snonux/gorum/internal/quorum"
"codeberg.org/snonux/gorum/internal/server"
)
@@ -14,7 +15,11 @@ func Run(ctx context.Context, configFile string) error {
return err
}
- client.Start(ctx, conf)
- server.Start(ctx, conf)
+ quo := quorum.New(conf)
+ winnerCh := quo.Start(ctx)
+
+ server.Start(ctx, conf, quo)
+ client.Start(ctx, conf, winnerCh)
+
return nil
}
diff --git a/internal/server/server.go b/internal/server/server.go
index 088e36c..dc1fd90 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -10,11 +10,11 @@ import (
"codeberg.org/snonux/gorum/internal/vote"
)
-func Start(ctx context.Context, conf config.Config) {
+func Start(ctx context.Context, conf config.Config, quo quorum.Quorum) {
go func() {
for {
log.Println("Starting server")
- if err := run(ctx, conf); err != nil {
+ if err := run(ctx, conf, quo); err != nil {
log.Println(err)
}
@@ -27,26 +27,17 @@ func Start(ctx context.Context, conf config.Config) {
}()
}
-func run(ctx context.Context, conf config.Config) error {
+func run(ctx context.Context, conf config.Config, quo quorum.Quorum) error {
serverCtx, cancel := context.WithCancel(ctx)
defer cancel()
ch := make(chan vote.Vote)
- quo := make(quorum.Quorum)
go func() {
for {
select {
case vote := <-ch:
quo.Vote(vote)
- winner, err := quo.Winner(conf)
- if err != nil {
- log.Println(err.Error())
- continue
- }
- log.Printf("The current leader node is %s", winner)
- case <-time.After(vote.Expiry):
- quo.Expire()
case <-serverCtx.Done():
return
}