summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-27 06:09:31 +0200
committerPaul Buetow <paul@buetow.org>2026-03-27 06:09:31 +0200
commitc5e06e480d01f4f87d02b5f04e873f44a679c741 (patch)
tree461cbf5b11edd843dd3599ae9a415f6e1d8aac75 /src/main/java
parente3dfc7a62fc1eb27a9fb68dd530064cdd2d5bb07 (diff)
Fix Raft append replication review issues b85586a4-4eb9-4686-93c7-0ab14173baa5
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/protocols/implementations/VSRaftProtocol.java23
1 files changed, 19 insertions, 4 deletions
diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java
index 983c8d3..8c36b68 100644
--- a/src/main/java/protocols/implementations/VSRaftProtocol.java
+++ b/src/main/java/protocols/implementations/VSRaftProtocol.java
@@ -277,6 +277,8 @@ public class VSRaftProtocol extends VSAbstractProtocol {
* Sends a simplified append-entry request for the configured log entry.
*/
private void sendAppendEntry() {
+ ackPids.clear();
+
if (getVectorKeySet().contains("pids")) {
ackPids.addAll(getVector("pids"));
}
@@ -381,6 +383,7 @@ public class VSRaftProtocol extends VSAbstractProtocol {
private void handleAppendEntry(VSMessage recvMessage) {
int messageTerm = recvMessage.getInteger("term");
int messageLeaderId = recvMessage.getInteger("leaderId");
+ int messageLogIndex = recvMessage.getInteger("logIndex");
if (messageTerm > currentTerm) {
becomeFollower(messageTerm, messageLeaderId);
@@ -393,13 +396,17 @@ public class VSRaftProtocol extends VSAbstractProtocol {
return;
}
- logIndex++;
+ if (messageLogIndex != logIndex + 1) {
+ return;
+ }
+
+ logIndex = messageLogIndex;
VSMessage appendAck = new VSMessage();
appendAck.setString("type", "appendAck");
appendAck.setInteger("term", currentTerm);
appendAck.setInteger("pid", process.getProcessID());
- appendAck.setInteger("logIndex", logIndex);
+ appendAck.setInteger("logIndex", messageLogIndex);
appendAck.setInteger("targetPid", messageLeaderId);
sendMessage(appendAck);
}
@@ -410,17 +417,25 @@ public class VSRaftProtocol extends VSAbstractProtocol {
* @param recvMessage the append acknowledgement
*/
private void handleAppendAck(VSMessage recvMessage) {
+ int messageTerm = recvMessage.getInteger("term");
Integer responderPid = recvMessage.getIntegerObj("pid");
+ int ackLogIndex = recvMessage.getInteger("logIndex");
+
+ if (messageTerm > currentTerm) {
+ becomeFollower(messageTerm, -1);
+ return;
+ }
if (!isLeader || !isForMe(recvMessage) || responderPid == null ||
+ messageTerm != currentTerm || ackLogIndex != logIndex ||
!ackPids.contains(responderPid)) {
return;
}
ackPids.remove(responderPid);
- if (ackPids.isEmpty()) {
- commitIndex++;
+ if (ackPids.isEmpty() && commitIndex < ackLogIndex) {
+ commitIndex = ackLogIndex;
log("Committed log index " + commitIndex);
}
}