diff options
Diffstat (limited to 'src/main/java/protocols/implementations/VSRaftProtocol.java')
| -rw-r--r-- | src/main/java/protocols/implementations/VSRaftProtocol.java | 55 |
1 files changed, 55 insertions, 0 deletions
diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java index 56e758d..4a310ee 100644 --- a/src/main/java/protocols/implementations/VSRaftProtocol.java +++ b/src/main/java/protocols/implementations/VSRaftProtocol.java @@ -70,6 +70,7 @@ public class VSRaftProtocol extends VSAbstractProtocol { public void onClientInit() { initLong("electionTimeout", 4000, "Base election timeout", "ms"); initLong("electionJitter", 2000, "Election timeout jitter", "ms"); + resetElectionTimeout(); } /* (non-Javadoc) @@ -104,6 +105,11 @@ public class VSRaftProtocol extends VSAbstractProtocol { * @see protocols.VSAbstractProtocol#onClientSchedule() */ public void onClientSchedule() { + long elapsedSinceHeartbeat = process.getTime() - lastHeartbeatTime; + + if (!isLeader && elapsedSinceHeartbeat >= getLong("electionTimeout")) { + startElection(); + } } /* (non-Javadoc) @@ -147,12 +153,61 @@ public class VSRaftProtocol extends VSAbstractProtocol { private void becomeLeader() { isLeader = true; isCandidate = false; + removeSchedules(); leaderId = process.getProcessID(); lastHeartbeatTime = process.getTime(); sendHeartbeat(); } /** + * Transitions this process into the follower role for the supplied term. + * + * @param term the term to adopt + * @param newLeaderId the known leader in that term, or -1 if unknown + */ + private void becomeFollower(int term, int newLeaderId) { + isLeader = false; + isCandidate = false; + currentTerm = term; + leaderId = newLeaderId; + votedFor = -1; + votesReceived = 0; + resetElectionTimeout(); + } + + /** + * Resets the follower election timeout using a randomized client schedule. + */ + private void resetElectionTimeout() { + long jitterPercentage = Math.abs(process.getRandomPercentage()); + long jitter = (getLong("electionJitter") * jitterPercentage) / 100L; + + removeSchedules(); + scheduleAt(process.getTime() + getLong("electionTimeout") + jitter); + } + + /** + * Starts a new election and re-arms the candidate timeout. + */ + private void startElection() { + currentTerm++; + votedFor = process.getProcessID(); + votesReceived = 1; + isLeader = false; + isCandidate = true; + leaderId = -1; + lastHeartbeatTime = process.getTime(); + + VSMessage voteRequest = new VSMessage(); + voteRequest.setString("type", "voteRequest"); + voteRequest.setInteger("term", currentTerm); + voteRequest.setInteger("candidateId", process.getProcessID()); + sendMessage(voteRequest); + + resetElectionTimeout(); + } + + /** * Sends a heartbeat and schedules the next leader heartbeat interval. */ private void sendHeartbeat() { |
