summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-27 06:31:57 +0200
committerPaul Buetow <paul@buetow.org>2026-03-27 06:31:57 +0200
commit508c2c50325e8436e6700445bf8e811e4228c4b7 (patch)
tree1a16fe46836857e79839ea8d21c7bf7ab49d3553
parent1a8c37f44ee42f01574346f429813925d461effd (diff)
Implement Raft heartbeat dispatch c7885f70-033c-42e2-b831-50c6d3cb315a
-rw-r--r--src/main/java/protocols/implementations/VSRaftProtocol.java87
-rw-r--r--src/test/java/protocols/implementations/VSRaftProtocolTest.java74
2 files changed, 153 insertions, 8 deletions
diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java
index 6828b34..53ead3c 100644
--- a/src/main/java/protocols/implementations/VSRaftProtocol.java
+++ b/src/main/java/protocols/implementations/VSRaftProtocol.java
@@ -306,15 +306,86 @@ public class VSRaftProtocol extends VSAbstractProtocol {
private void handleMessage(VSMessage recvMessage) {
String messageType = recvMessage.getString("type");
- if ("voteRequest".equals(messageType)) {
- handleVoteRequest(recvMessage);
- } else if ("voteResponse".equals(messageType)) {
- handleVoteResponse(recvMessage);
- } else if ("appendEntry".equals(messageType)) {
- handleAppendEntry(recvMessage);
- } else if ("appendAck".equals(messageType)) {
- handleAppendAck(recvMessage);
+ if (messageType == null) {
+ return;
+ }
+
+ switch (messageType) {
+ case "heartbeat":
+ handleHeartbeat(recvMessage);
+ break;
+ case "heartbeatAck":
+ handleHeartbeatAck(recvMessage);
+ break;
+ case "voteRequest":
+ handleVoteRequest(recvMessage);
+ break;
+ case "voteResponse":
+ handleVoteResponse(recvMessage);
+ break;
+ case "appendEntry":
+ handleAppendEntry(recvMessage);
+ break;
+ case "appendAck":
+ handleAppendAck(recvMessage);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Handles an incoming heartbeat from the current leader.
+ *
+ * @param recvMessage the heartbeat message
+ */
+ private void handleHeartbeat(VSMessage recvMessage) {
+ int messageTerm = recvMessage.getInteger("term");
+ int messageLeaderId = recvMessage.getInteger("leaderId");
+
+ if (messageTerm < currentTerm) {
+ return;
}
+
+ if (messageTerm > currentTerm) {
+ becomeFollower(messageTerm, messageLeaderId);
+ } else {
+ leaderId = messageLeaderId;
+ isLeader = false;
+ isCandidate = false;
+ resetElectionTimeout();
+ }
+
+ lastHeartbeatTime = process.getTime();
+
+ VSMessage heartbeatAck = new VSMessage();
+ heartbeatAck.setString("type", "heartbeatAck");
+ heartbeatAck.setInteger("term", currentTerm);
+ heartbeatAck.setInteger("pid", process.getProcessID());
+ heartbeatAck.setInteger("targetPid", messageLeaderId);
+ sendMessage(heartbeatAck);
+ }
+
+ /**
+ * Handles an incoming heartbeat acknowledgement on the leader.
+ *
+ * @param recvMessage the heartbeat acknowledgement
+ */
+ private void handleHeartbeatAck(VSMessage recvMessage) {
+ int messageTerm = recvMessage.getInteger("term");
+ Integer responderPid = recvMessage.getIntegerObj("pid");
+
+ if (messageTerm > currentTerm) {
+ becomeFollower(messageTerm, -1);
+ return;
+ }
+
+ if (!isLeader || !isForMe(recvMessage) || responderPid == null ||
+ messageTerm != currentTerm) {
+ return;
+ }
+
+ log("Heartbeat ACK from process " + responderPid + " received");
}
/**
diff --git a/src/test/java/protocols/implementations/VSRaftProtocolTest.java b/src/test/java/protocols/implementations/VSRaftProtocolTest.java
index b249761..3e8f3fa 100644
--- a/src/test/java/protocols/implementations/VSRaftProtocolTest.java
+++ b/src/test/java/protocols/implementations/VSRaftProtocolTest.java
@@ -379,6 +379,44 @@ class VSRaftProtocolTest {
}
@Test
+ void testClientReceiveHeartbeatBecomesFollowerResetsTimeoutAndSendsAck()
+ throws Exception {
+ protocol.currentContextIsServer(false);
+ protocol.onClientInit();
+ clearInvocations(mockProcess, mockTaskManager);
+ setIntField("currentTerm", 1);
+ setBooleanField("isCandidate", true);
+ when(mockProcess.getTime()).thenReturn(350L, 350L);
+
+ VSMessage heartbeat = new VSMessage();
+ heartbeat.setString("type", "heartbeat");
+ heartbeat.setInteger("term", 2);
+ heartbeat.setInteger("leaderId", 11);
+
+ ArgumentCaptor<VSMessage> messageCaptor =
+ ArgumentCaptor.forClass(VSMessage.class);
+ ArgumentCaptor<VSTask> taskCaptor = ArgumentCaptor.forClass(VSTask.class);
+
+ protocol.onClientRecv(heartbeat);
+
+ verify(mockProcess).sendMessage(messageCaptor.capture());
+ verify(mockTaskManager, times(2)).removeAllTasks(any());
+ verify(mockTaskManager).addTask(taskCaptor.capture());
+
+ VSMessage heartbeatAck = messageCaptor.getValue();
+ assertEquals("heartbeatAck", heartbeatAck.getString("type"));
+ assertEquals(2, heartbeatAck.getInteger("term"));
+ assertEquals(7, heartbeatAck.getInteger("pid"));
+ assertEquals(11, heartbeatAck.getInteger("targetPid"));
+ assertEquals(2, getIntField("currentTerm"));
+ assertEquals(11, getIntField("leaderId"));
+ assertFalse(getBooleanField("isLeader"));
+ assertFalse(getBooleanField("isCandidate"));
+ assertEquals(350L, getLongField("lastHeartbeatTime"));
+ assertEquals(4850L, taskCaptor.getValue().getTaskTime());
+ }
+
+ @Test
void testVoteResponseMajorityPromotesCandidateToLeader() throws Exception {
protocol.currentContextIsServer(false);
setIntField("currentTerm", 3);
@@ -477,6 +515,42 @@ class VSRaftProtocolTest {
}
@Test
+ void testServerReceiveHeartbeatAckForLeaderLogsAck() throws Exception {
+ protocol.onStart();
+ clearInvocations(mockProcess, mockTaskManager);
+
+ VSMessage heartbeatAck = new VSMessage();
+ heartbeatAck.setString("type", "heartbeatAck");
+ heartbeatAck.setInteger("term", 0);
+ heartbeatAck.setInteger("pid", 2);
+ heartbeatAck.setInteger("targetPid", 7);
+
+ protocol.onServerRecv(heartbeatAck);
+
+ verify(mockProcess).log("Heartbeat ACK from process 2 received");
+ verify(mockTaskManager, never()).removeAllTasks(any());
+ verify(mockTaskManager, never()).addTask(any());
+ }
+
+ @Test
+ void testHeartbeatAckForDifferentTargetDoesNotLog() throws Exception {
+ protocol.onStart();
+ clearInvocations(mockProcess, mockTaskManager);
+
+ VSMessage heartbeatAck = new VSMessage();
+ heartbeatAck.setString("type", "heartbeatAck");
+ heartbeatAck.setInteger("term", 0);
+ heartbeatAck.setInteger("pid", 2);
+ heartbeatAck.setInteger("targetPid", 99);
+
+ protocol.onServerRecv(heartbeatAck);
+
+ verify(mockProcess, never()).log(anyString());
+ verify(mockTaskManager, never()).removeAllTasks(any());
+ verify(mockTaskManager, never()).addTask(any());
+ }
+
+ @Test
void testDuplicateVoteResponsesFromSamePeerDoNotCreateMajority()
throws Exception {
protocol.currentContextIsServer(false);