diff options
| -rw-r--r-- | src/main/java/protocols/implementations/VSRaftProtocol.java | 87 | ||||
| -rw-r--r-- | src/test/java/protocols/implementations/VSRaftProtocolTest.java | 74 |
2 files changed, 153 insertions, 8 deletions
diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java index 6828b34..53ead3c 100644 --- a/src/main/java/protocols/implementations/VSRaftProtocol.java +++ b/src/main/java/protocols/implementations/VSRaftProtocol.java @@ -306,15 +306,86 @@ public class VSRaftProtocol extends VSAbstractProtocol { private void handleMessage(VSMessage recvMessage) { String messageType = recvMessage.getString("type"); - if ("voteRequest".equals(messageType)) { - handleVoteRequest(recvMessage); - } else if ("voteResponse".equals(messageType)) { - handleVoteResponse(recvMessage); - } else if ("appendEntry".equals(messageType)) { - handleAppendEntry(recvMessage); - } else if ("appendAck".equals(messageType)) { - handleAppendAck(recvMessage); + if (messageType == null) { + return; + } + + switch (messageType) { + case "heartbeat": + handleHeartbeat(recvMessage); + break; + case "heartbeatAck": + handleHeartbeatAck(recvMessage); + break; + case "voteRequest": + handleVoteRequest(recvMessage); + break; + case "voteResponse": + handleVoteResponse(recvMessage); + break; + case "appendEntry": + handleAppendEntry(recvMessage); + break; + case "appendAck": + handleAppendAck(recvMessage); + break; + default: + break; + } + } + + /** + * Handles an incoming heartbeat from the current leader. + * + * @param recvMessage the heartbeat message + */ + private void handleHeartbeat(VSMessage recvMessage) { + int messageTerm = recvMessage.getInteger("term"); + int messageLeaderId = recvMessage.getInteger("leaderId"); + + if (messageTerm < currentTerm) { + return; } + + if (messageTerm > currentTerm) { + becomeFollower(messageTerm, messageLeaderId); + } else { + leaderId = messageLeaderId; + isLeader = false; + isCandidate = false; + resetElectionTimeout(); + } + + lastHeartbeatTime = process.getTime(); + + VSMessage heartbeatAck = new VSMessage(); + heartbeatAck.setString("type", "heartbeatAck"); + heartbeatAck.setInteger("term", currentTerm); + heartbeatAck.setInteger("pid", process.getProcessID()); + heartbeatAck.setInteger("targetPid", messageLeaderId); + sendMessage(heartbeatAck); + } + + /** + * Handles an incoming heartbeat acknowledgement on the leader. + * + * @param recvMessage the heartbeat acknowledgement + */ + private void handleHeartbeatAck(VSMessage recvMessage) { + int messageTerm = recvMessage.getInteger("term"); + Integer responderPid = recvMessage.getIntegerObj("pid"); + + if (messageTerm > currentTerm) { + becomeFollower(messageTerm, -1); + return; + } + + if (!isLeader || !isForMe(recvMessage) || responderPid == null || + messageTerm != currentTerm) { + return; + } + + log("Heartbeat ACK from process " + responderPid + " received"); } /** diff --git a/src/test/java/protocols/implementations/VSRaftProtocolTest.java b/src/test/java/protocols/implementations/VSRaftProtocolTest.java index b249761..3e8f3fa 100644 --- a/src/test/java/protocols/implementations/VSRaftProtocolTest.java +++ b/src/test/java/protocols/implementations/VSRaftProtocolTest.java @@ -379,6 +379,44 @@ class VSRaftProtocolTest { } @Test + void testClientReceiveHeartbeatBecomesFollowerResetsTimeoutAndSendsAck() + throws Exception { + protocol.currentContextIsServer(false); + protocol.onClientInit(); + clearInvocations(mockProcess, mockTaskManager); + setIntField("currentTerm", 1); + setBooleanField("isCandidate", true); + when(mockProcess.getTime()).thenReturn(350L, 350L); + + VSMessage heartbeat = new VSMessage(); + heartbeat.setString("type", "heartbeat"); + heartbeat.setInteger("term", 2); + heartbeat.setInteger("leaderId", 11); + + ArgumentCaptor<VSMessage> messageCaptor = + ArgumentCaptor.forClass(VSMessage.class); + ArgumentCaptor<VSTask> taskCaptor = ArgumentCaptor.forClass(VSTask.class); + + protocol.onClientRecv(heartbeat); + + verify(mockProcess).sendMessage(messageCaptor.capture()); + verify(mockTaskManager, times(2)).removeAllTasks(any()); + verify(mockTaskManager).addTask(taskCaptor.capture()); + + VSMessage heartbeatAck = messageCaptor.getValue(); + assertEquals("heartbeatAck", heartbeatAck.getString("type")); + assertEquals(2, heartbeatAck.getInteger("term")); + assertEquals(7, heartbeatAck.getInteger("pid")); + assertEquals(11, heartbeatAck.getInteger("targetPid")); + assertEquals(2, getIntField("currentTerm")); + assertEquals(11, getIntField("leaderId")); + assertFalse(getBooleanField("isLeader")); + assertFalse(getBooleanField("isCandidate")); + assertEquals(350L, getLongField("lastHeartbeatTime")); + assertEquals(4850L, taskCaptor.getValue().getTaskTime()); + } + + @Test void testVoteResponseMajorityPromotesCandidateToLeader() throws Exception { protocol.currentContextIsServer(false); setIntField("currentTerm", 3); @@ -477,6 +515,42 @@ class VSRaftProtocolTest { } @Test + void testServerReceiveHeartbeatAckForLeaderLogsAck() throws Exception { + protocol.onStart(); + clearInvocations(mockProcess, mockTaskManager); + + VSMessage heartbeatAck = new VSMessage(); + heartbeatAck.setString("type", "heartbeatAck"); + heartbeatAck.setInteger("term", 0); + heartbeatAck.setInteger("pid", 2); + heartbeatAck.setInteger("targetPid", 7); + + protocol.onServerRecv(heartbeatAck); + + verify(mockProcess).log("Heartbeat ACK from process 2 received"); + verify(mockTaskManager, never()).removeAllTasks(any()); + verify(mockTaskManager, never()).addTask(any()); + } + + @Test + void testHeartbeatAckForDifferentTargetDoesNotLog() throws Exception { + protocol.onStart(); + clearInvocations(mockProcess, mockTaskManager); + + VSMessage heartbeatAck = new VSMessage(); + heartbeatAck.setString("type", "heartbeatAck"); + heartbeatAck.setInteger("term", 0); + heartbeatAck.setInteger("pid", 2); + heartbeatAck.setInteger("targetPid", 99); + + protocol.onServerRecv(heartbeatAck); + + verify(mockProcess, never()).log(anyString()); + verify(mockTaskManager, never()).removeAllTasks(any()); + verify(mockTaskManager, never()).addTask(any()); + } + + @Test void testDuplicateVoteResponsesFromSamePeerDoNotCreateMajority() throws Exception { protocol.currentContextIsServer(false); |
