diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-27 06:09:31 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-27 06:09:31 +0200 |
| commit | c5e06e480d01f4f87d02b5f04e873f44a679c741 (patch) | |
| tree | 461cbf5b11edd843dd3599ae9a415f6e1d8aac75 /src/main/java | |
| parent | e3dfc7a62fc1eb27a9fb68dd530064cdd2d5bb07 (diff) | |
Fix Raft append replication review issues b85586a4-4eb9-4686-93c7-0ab14173baa5
Diffstat (limited to 'src/main/java')
| -rw-r--r-- | src/main/java/protocols/implementations/VSRaftProtocol.java | 23 |
1 files changed, 19 insertions, 4 deletions
diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java index 983c8d3..8c36b68 100644 --- a/src/main/java/protocols/implementations/VSRaftProtocol.java +++ b/src/main/java/protocols/implementations/VSRaftProtocol.java @@ -277,6 +277,8 @@ public class VSRaftProtocol extends VSAbstractProtocol { * Sends a simplified append-entry request for the configured log entry. */ private void sendAppendEntry() { + ackPids.clear(); + if (getVectorKeySet().contains("pids")) { ackPids.addAll(getVector("pids")); } @@ -381,6 +383,7 @@ public class VSRaftProtocol extends VSAbstractProtocol { private void handleAppendEntry(VSMessage recvMessage) { int messageTerm = recvMessage.getInteger("term"); int messageLeaderId = recvMessage.getInteger("leaderId"); + int messageLogIndex = recvMessage.getInteger("logIndex"); if (messageTerm > currentTerm) { becomeFollower(messageTerm, messageLeaderId); @@ -393,13 +396,17 @@ public class VSRaftProtocol extends VSAbstractProtocol { return; } - logIndex++; + if (messageLogIndex != logIndex + 1) { + return; + } + + logIndex = messageLogIndex; VSMessage appendAck = new VSMessage(); appendAck.setString("type", "appendAck"); appendAck.setInteger("term", currentTerm); appendAck.setInteger("pid", process.getProcessID()); - appendAck.setInteger("logIndex", logIndex); + appendAck.setInteger("logIndex", messageLogIndex); appendAck.setInteger("targetPid", messageLeaderId); sendMessage(appendAck); } @@ -410,17 +417,25 @@ public class VSRaftProtocol extends VSAbstractProtocol { * @param recvMessage the append acknowledgement */ private void handleAppendAck(VSMessage recvMessage) { + int messageTerm = recvMessage.getInteger("term"); Integer responderPid = recvMessage.getIntegerObj("pid"); + int ackLogIndex = recvMessage.getInteger("logIndex"); + + if (messageTerm > currentTerm) { + becomeFollower(messageTerm, -1); + return; + } if (!isLeader || !isForMe(recvMessage) || responderPid == null || + messageTerm != currentTerm || ackLogIndex != logIndex || !ackPids.contains(responderPid)) { return; } ackPids.remove(responderPid); - if (ackPids.isEmpty()) { - commitIndex++; + if (ackPids.isEmpty() && commitIndex < ackLogIndex) { + commitIndex = ackLogIndex; log("Committed log index " + commitIndex); } } |
