diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-27 06:31:57 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-27 06:31:57 +0200 |
| commit | 508c2c50325e8436e6700445bf8e811e4228c4b7 (patch) | |
| tree | 1a16fe46836857e79839ea8d21c7bf7ab49d3553 /src/main | |
| parent | 1a8c37f44ee42f01574346f429813925d461effd (diff) | |
Implement Raft heartbeat dispatch c7885f70-033c-42e2-b831-50c6d3cb315a
Diffstat (limited to 'src/main')
| -rw-r--r-- | src/main/java/protocols/implementations/VSRaftProtocol.java | 87 |
1 files changed, 79 insertions, 8 deletions
diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java index 6828b34..53ead3c 100644 --- a/src/main/java/protocols/implementations/VSRaftProtocol.java +++ b/src/main/java/protocols/implementations/VSRaftProtocol.java @@ -306,15 +306,86 @@ public class VSRaftProtocol extends VSAbstractProtocol { private void handleMessage(VSMessage recvMessage) { String messageType = recvMessage.getString("type"); - if ("voteRequest".equals(messageType)) { - handleVoteRequest(recvMessage); - } else if ("voteResponse".equals(messageType)) { - handleVoteResponse(recvMessage); - } else if ("appendEntry".equals(messageType)) { - handleAppendEntry(recvMessage); - } else if ("appendAck".equals(messageType)) { - handleAppendAck(recvMessage); + if (messageType == null) { + return; + } + + switch (messageType) { + case "heartbeat": + handleHeartbeat(recvMessage); + break; + case "heartbeatAck": + handleHeartbeatAck(recvMessage); + break; + case "voteRequest": + handleVoteRequest(recvMessage); + break; + case "voteResponse": + handleVoteResponse(recvMessage); + break; + case "appendEntry": + handleAppendEntry(recvMessage); + break; + case "appendAck": + handleAppendAck(recvMessage); + break; + default: + break; + } + } + + /** + * Handles an incoming heartbeat from the current leader. + * + * @param recvMessage the heartbeat message + */ + private void handleHeartbeat(VSMessage recvMessage) { + int messageTerm = recvMessage.getInteger("term"); + int messageLeaderId = recvMessage.getInteger("leaderId"); + + if (messageTerm < currentTerm) { + return; } + + if (messageTerm > currentTerm) { + becomeFollower(messageTerm, messageLeaderId); + } else { + leaderId = messageLeaderId; + isLeader = false; + isCandidate = false; + resetElectionTimeout(); + } + + lastHeartbeatTime = process.getTime(); + + VSMessage heartbeatAck = new VSMessage(); + heartbeatAck.setString("type", "heartbeatAck"); + heartbeatAck.setInteger("term", currentTerm); + heartbeatAck.setInteger("pid", process.getProcessID()); + heartbeatAck.setInteger("targetPid", messageLeaderId); + sendMessage(heartbeatAck); + } + + /** + * Handles an incoming heartbeat acknowledgement on the leader. + * + * @param recvMessage the heartbeat acknowledgement + */ + private void handleHeartbeatAck(VSMessage recvMessage) { + int messageTerm = recvMessage.getInteger("term"); + Integer responderPid = recvMessage.getIntegerObj("pid"); + + if (messageTerm > currentTerm) { + becomeFollower(messageTerm, -1); + return; + } + + if (!isLeader || !isForMe(recvMessage) || responderPid == null || + messageTerm != currentTerm) { + return; + } + + log("Heartbeat ACK from process " + responderPid + " received"); } /** |
