diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/client/client.go | 15 | ||||
| -rw-r--r-- | internal/client/tcpclient.go | 11 | ||||
| -rw-r--r-- | internal/run.go | 4 | ||||
| -rw-r--r-- | internal/server/server.go | 9 | ||||
| -rw-r--r-- | internal/vote/vote.go | 12 | ||||
| -rw-r--r-- | internal/vote/vote_test.go | 4 |
6 files changed, 35 insertions, 20 deletions
diff --git a/internal/client/client.go b/internal/client/client.go index 91d64e2..3cb2775 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -6,11 +6,12 @@ import ( "time" "codeberg.org/snonux/gorum/internal/config" + "codeberg.org/snonux/gorum/internal/vote" ) -func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string) { +func Start(ctx context.Context, conf config.Config, myVoteCh <-chan vote.Vote) { log.Println("client: starting") - fanOut := make([]chan []string, len(conf.Nodes)) + fanOut := make([]chan vote.Vote, len(conf.Nodes)) for i, node := range conf.Nodes { fanOut[i] = startConnection(ctx, node) @@ -25,8 +26,8 @@ func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string) for { select { - case liveNodes := <-liveNodesCh: - log.Printf("client: notifying live nodes %v to all partner nodes", liveNodes) + case myVote := <-myVoteCh: + log.Printf("client: notifying live nodes %v to all partner nodes", myVote) for _, ch := range fanOut { // First, clear previous element of the channel, if any select { @@ -34,7 +35,7 @@ func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string) default: } // Now, update channel with the new live nodes. - ch <- liveNodes + ch <- myVote } case <-ctx.Done(): return @@ -43,8 +44,8 @@ func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string) }() } -func startConnection(ctx context.Context, node string) chan []string { - ch := make(chan []string, 1) +func startConnection(ctx context.Context, node string) chan vote.Vote { + ch := make(chan vote.Vote, 1) go func() { for { diff --git a/internal/client/tcpclient.go b/internal/client/tcpclient.go index 58004c7..83cb3b6 100644 --- a/internal/client/tcpclient.go +++ b/internal/client/tcpclient.go @@ -6,10 +6,11 @@ import ( "io/ioutil" "log" "net" - "strings" + + "codeberg.org/snonux/gorum/internal/vote" ) -func tcpClientRun(ctx context.Context, node string, ch <-chan []string) error { +func tcpClientRun(ctx context.Context, node string, ch <-chan vote.Vote) error { conn, err := net.Dial("tcp", node) if err != nil { return err @@ -22,7 +23,11 @@ func tcpClientRun(ctx context.Context, node string, ch <-chan []string) error { return fmt.Errorf("channel closed - breaking tcpClientRun loop") } - message := strings.Join(votes, " ") + message, err := votes.ToJSON() + if err != nil { + return err + } + log.Println("tcpclient: sending", message, "to node", node) _, err = conn.Write([]byte(message)) if err != nil { diff --git a/internal/run.go b/internal/run.go index b069472..15c661f 100644 --- a/internal/run.go +++ b/internal/run.go @@ -16,9 +16,9 @@ func Run(ctx context.Context, configFile string) error { } quo := quorum.New(conf) - liveNodesCh := quo.Start(ctx) + myVoteCh := quo.Start(ctx) server.Start(ctx, conf, quo) - client.Start(ctx, conf, liveNodesCh) + client.Start(ctx, conf, myVoteCh) <-ctx.Done() return nil diff --git a/internal/server/server.go b/internal/server/server.go index e9d62f0..c3e9e05 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -45,7 +45,12 @@ func runServer(ctx context.Context, conf config.Config, quo quorum.Quorum) error }() return tcpServerRun(serverCtx, conf, func(message string) string { - ch <- vote.New(conf, message) - return "ok - I received your message, dear client" + v, err := vote.NewFromJSON(message) + if err != nil { + return err.Error() + } + + ch <- v + return "I received your message, dear client" }) } diff --git a/internal/vote/vote.go b/internal/vote/vote.go index 137beed..8ad5a37 100644 --- a/internal/vote/vote.go +++ b/internal/vote/vote.go @@ -26,16 +26,20 @@ func New(ids []string) (Vote, error) { return v, nil } -func NewFromJSON(bytes []byte) (v Vote, err error) { - if err = json.Unmarshal(bytes, &v); err != nil { +func NewFromJSON(jsonStr string) (v Vote, err error) { + if err = json.Unmarshal([]byte(jsonStr), &v); err != nil { return } v.ExpiresAt = time.Now().Add(Expiry) return } -func (v Vote) ToJSON() ([]byte, error) { - return json.Marshal(v) +func (v Vote) ToJSON() (string, error) { + bytes, err := json.Marshal(v) + if err != nil { + return "", err + } + return string(bytes), nil } func (v Vote) Expired() bool { diff --git a/internal/vote/vote_test.go b/internal/vote/vote_test.go index 02c4af0..a2a851b 100644 --- a/internal/vote/vote_test.go +++ b/internal/vote/vote_test.go @@ -45,12 +45,12 @@ func TestVoteExpiry(t *testing.T) { func TestMarshalling(t *testing.T) { v, _ := New([]string{"foo", "bar", "baz", "bay"}) - bytes, err := v.ToJSON() + jsonStr, err := v.ToJSON() if err != nil { t.Errorf("unable to serialize vote to json: %v", err) } - v2, err := NewFromJSON(bytes) + v2, err := NewFromJSON(jsonStr) if err != nil { t.Errorf("unable to deserialize json to vote: %v", err) } |
