summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/client/client.go15
-rw-r--r--internal/client/tcpclient.go11
-rw-r--r--internal/run.go4
-rw-r--r--internal/server/server.go9
-rw-r--r--internal/vote/vote.go12
-rw-r--r--internal/vote/vote_test.go4
6 files changed, 35 insertions, 20 deletions
diff --git a/internal/client/client.go b/internal/client/client.go
index 91d64e2..3cb2775 100644
--- a/internal/client/client.go
+++ b/internal/client/client.go
@@ -6,11 +6,12 @@ import (
"time"
"codeberg.org/snonux/gorum/internal/config"
+ "codeberg.org/snonux/gorum/internal/vote"
)
-func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string) {
+func Start(ctx context.Context, conf config.Config, myVoteCh <-chan vote.Vote) {
log.Println("client: starting")
- fanOut := make([]chan []string, len(conf.Nodes))
+ fanOut := make([]chan vote.Vote, len(conf.Nodes))
for i, node := range conf.Nodes {
fanOut[i] = startConnection(ctx, node)
@@ -25,8 +26,8 @@ func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string)
for {
select {
- case liveNodes := <-liveNodesCh:
- log.Printf("client: notifying live nodes %v to all partner nodes", liveNodes)
+ case myVote := <-myVoteCh:
+ log.Printf("client: notifying live nodes %v to all partner nodes", myVote)
for _, ch := range fanOut {
// First, clear previous element of the channel, if any
select {
@@ -34,7 +35,7 @@ func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string)
default:
}
// Now, update channel with the new live nodes.
- ch <- liveNodes
+ ch <- myVote
}
case <-ctx.Done():
return
@@ -43,8 +44,8 @@ func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string)
}()
}
-func startConnection(ctx context.Context, node string) chan []string {
- ch := make(chan []string, 1)
+func startConnection(ctx context.Context, node string) chan vote.Vote {
+ ch := make(chan vote.Vote, 1)
go func() {
for {
diff --git a/internal/client/tcpclient.go b/internal/client/tcpclient.go
index 58004c7..83cb3b6 100644
--- a/internal/client/tcpclient.go
+++ b/internal/client/tcpclient.go
@@ -6,10 +6,11 @@ import (
"io/ioutil"
"log"
"net"
- "strings"
+
+ "codeberg.org/snonux/gorum/internal/vote"
)
-func tcpClientRun(ctx context.Context, node string, ch <-chan []string) error {
+func tcpClientRun(ctx context.Context, node string, ch <-chan vote.Vote) error {
conn, err := net.Dial("tcp", node)
if err != nil {
return err
@@ -22,7 +23,11 @@ func tcpClientRun(ctx context.Context, node string, ch <-chan []string) error {
return fmt.Errorf("channel closed - breaking tcpClientRun loop")
}
- message := strings.Join(votes, " ")
+ message, err := votes.ToJSON()
+ if err != nil {
+ return err
+ }
+
log.Println("tcpclient: sending", message, "to node", node)
_, err = conn.Write([]byte(message))
if err != nil {
diff --git a/internal/run.go b/internal/run.go
index b069472..15c661f 100644
--- a/internal/run.go
+++ b/internal/run.go
@@ -16,9 +16,9 @@ func Run(ctx context.Context, configFile string) error {
}
quo := quorum.New(conf)
- liveNodesCh := quo.Start(ctx)
+ myVoteCh := quo.Start(ctx)
server.Start(ctx, conf, quo)
- client.Start(ctx, conf, liveNodesCh)
+ client.Start(ctx, conf, myVoteCh)
<-ctx.Done()
return nil
diff --git a/internal/server/server.go b/internal/server/server.go
index e9d62f0..c3e9e05 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -45,7 +45,12 @@ func runServer(ctx context.Context, conf config.Config, quo quorum.Quorum) error
}()
return tcpServerRun(serverCtx, conf, func(message string) string {
- ch <- vote.New(conf, message)
- return "ok - I received your message, dear client"
+ v, err := vote.NewFromJSON(message)
+ if err != nil {
+ return err.Error()
+ }
+
+ ch <- v
+ return "I received your message, dear client"
})
}
diff --git a/internal/vote/vote.go b/internal/vote/vote.go
index 137beed..8ad5a37 100644
--- a/internal/vote/vote.go
+++ b/internal/vote/vote.go
@@ -26,16 +26,20 @@ func New(ids []string) (Vote, error) {
return v, nil
}
-func NewFromJSON(bytes []byte) (v Vote, err error) {
- if err = json.Unmarshal(bytes, &v); err != nil {
+func NewFromJSON(jsonStr string) (v Vote, err error) {
+ if err = json.Unmarshal([]byte(jsonStr), &v); err != nil {
return
}
v.ExpiresAt = time.Now().Add(Expiry)
return
}
-func (v Vote) ToJSON() ([]byte, error) {
- return json.Marshal(v)
+func (v Vote) ToJSON() (string, error) {
+ bytes, err := json.Marshal(v)
+ if err != nil {
+ return "", err
+ }
+ return string(bytes), nil
}
func (v Vote) Expired() bool {
diff --git a/internal/vote/vote_test.go b/internal/vote/vote_test.go
index 02c4af0..a2a851b 100644
--- a/internal/vote/vote_test.go
+++ b/internal/vote/vote_test.go
@@ -45,12 +45,12 @@ func TestVoteExpiry(t *testing.T) {
func TestMarshalling(t *testing.T) {
v, _ := New([]string{"foo", "bar", "baz", "bay"})
- bytes, err := v.ToJSON()
+ jsonStr, err := v.ToJSON()
if err != nil {
t.Errorf("unable to serialize vote to json: %v", err)
}
- v2, err := NewFromJSON(bytes)
+ v2, err := NewFromJSON(jsonStr)
if err != nil {
t.Errorf("unable to deserialize json to vote: %v", err)
}