summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-27 06:31:57 +0200
committerPaul Buetow <paul@buetow.org>2026-03-27 06:31:57 +0200
commit508c2c50325e8436e6700445bf8e811e4228c4b7 (patch)
tree1a16fe46836857e79839ea8d21c7bf7ab49d3553 /src/main
parent1a8c37f44ee42f01574346f429813925d461effd (diff)
Implement Raft heartbeat dispatch c7885f70-033c-42e2-b831-50c6d3cb315a
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/protocols/implementations/VSRaftProtocol.java87
1 files changed, 79 insertions, 8 deletions
diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java
index 6828b34..53ead3c 100644
--- a/src/main/java/protocols/implementations/VSRaftProtocol.java
+++ b/src/main/java/protocols/implementations/VSRaftProtocol.java
@@ -306,15 +306,86 @@ public class VSRaftProtocol extends VSAbstractProtocol {
private void handleMessage(VSMessage recvMessage) {
String messageType = recvMessage.getString("type");
- if ("voteRequest".equals(messageType)) {
- handleVoteRequest(recvMessage);
- } else if ("voteResponse".equals(messageType)) {
- handleVoteResponse(recvMessage);
- } else if ("appendEntry".equals(messageType)) {
- handleAppendEntry(recvMessage);
- } else if ("appendAck".equals(messageType)) {
- handleAppendAck(recvMessage);
+ if (messageType == null) {
+ return;
+ }
+
+ switch (messageType) {
+ case "heartbeat":
+ handleHeartbeat(recvMessage);
+ break;
+ case "heartbeatAck":
+ handleHeartbeatAck(recvMessage);
+ break;
+ case "voteRequest":
+ handleVoteRequest(recvMessage);
+ break;
+ case "voteResponse":
+ handleVoteResponse(recvMessage);
+ break;
+ case "appendEntry":
+ handleAppendEntry(recvMessage);
+ break;
+ case "appendAck":
+ handleAppendAck(recvMessage);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Handles an incoming heartbeat from the current leader.
+ *
+ * @param recvMessage the heartbeat message
+ */
+ private void handleHeartbeat(VSMessage recvMessage) {
+ int messageTerm = recvMessage.getInteger("term");
+ int messageLeaderId = recvMessage.getInteger("leaderId");
+
+ if (messageTerm < currentTerm) {
+ return;
}
+
+ if (messageTerm > currentTerm) {
+ becomeFollower(messageTerm, messageLeaderId);
+ } else {
+ leaderId = messageLeaderId;
+ isLeader = false;
+ isCandidate = false;
+ resetElectionTimeout();
+ }
+
+ lastHeartbeatTime = process.getTime();
+
+ VSMessage heartbeatAck = new VSMessage();
+ heartbeatAck.setString("type", "heartbeatAck");
+ heartbeatAck.setInteger("term", currentTerm);
+ heartbeatAck.setInteger("pid", process.getProcessID());
+ heartbeatAck.setInteger("targetPid", messageLeaderId);
+ sendMessage(heartbeatAck);
+ }
+
+ /**
+ * Handles an incoming heartbeat acknowledgement on the leader.
+ *
+ * @param recvMessage the heartbeat acknowledgement
+ */
+ private void handleHeartbeatAck(VSMessage recvMessage) {
+ int messageTerm = recvMessage.getInteger("term");
+ Integer responderPid = recvMessage.getIntegerObj("pid");
+
+ if (messageTerm > currentTerm) {
+ becomeFollower(messageTerm, -1);
+ return;
+ }
+
+ if (!isLeader || !isForMe(recvMessage) || responderPid == null ||
+ messageTerm != currentTerm) {
+ return;
+ }
+
+ log("Heartbeat ACK from process " + responderPid + " received");
}
/**