1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
|
package protocols.implementations;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import core.VSMessage;
import core.VSInternalProcess;
import protocols.VSAbstractProtocol;
/**
* Implementation of the Raft consensus algorithm.
*
* Raft is a consensus algorithm designed to be understandable. It ensures that
* a distributed system agrees on values even in the presence of failures.
*
* <p>The protocol has three states:</p>
* <ul>
* <li>Follower - Passive state, responds to leaders</li>
* <li>Candidate - Actively requesting votes to become leader</li>
* <li>Leader - Manages the cluster and log replication</li>
* </ul>
*
* <p>Key features implemented:</p>
* <ul>
* <li>Leader election with randomized timeouts</li>
* <li>Log replication for state machine commands</li>
* <li>Safety through term numbers and log matching</li>
* <li>Membership changes (simplified)</li>
* </ul>
*
* @author Paul C. Buetow
*/
public class VSRaftProtocol extends VSAbstractProtocol {
// Raft states
private enum State {
FOLLOWER,
CANDIDATE,
LEADER
}
// Message types
private static final String MSG_REQUEST_VOTE = "REQUEST_VOTE";
private static final String MSG_VOTE_RESPONSE = "VOTE_RESPONSE";
private static final String MSG_APPEND_ENTRIES = "APPEND_ENTRIES";
private static final String MSG_APPEND_RESPONSE = "APPEND_RESPONSE";
private static final String MSG_CLIENT_REQUEST = "CLIENT_REQUEST";
// Timing constants (in simulation time units)
private static final long HEARTBEAT_INTERVAL = 50;
private static final long ELECTION_TIMEOUT_MIN = 150;
private static final long ELECTION_TIMEOUT_MAX = 300;
// Server state (persistent - should be saved to stable storage)
private State currentState;
private int currentTerm;
private Integer votedFor;
private List<LogEntry> log;
// Server state (volatile)
private int commitIndex;
private int lastApplied;
// Leader state (reinitialized after election)
private Map<Integer, Integer> nextIndex;
private Map<Integer, Integer> matchIndex;
// Candidate state
private Set<Integer> votesReceived;
private long electionTimeout;
// General state
private Integer currentLeader;
private long lastHeartbeat;
// Client state
private boolean clientHasScheduled = false;
private int clientRequestCount = 0;
/**
* Log entry structure
*/
private static class LogEntry {
final int term;
final String command;
final long timestamp;
LogEntry(int term, String command, long timestamp) {
this.term = term;
this.command = command;
this.timestamp = timestamp;
}
@Override
public String toString() {
return String.format("LogEntry{term=%d, cmd='%s', time=%d}",
term, command, timestamp);
}
}
public VSRaftProtocol() {
super(VSAbstractProtocol.HAS_ON_SERVER_START);
}
@Override
public void onServerInit() {
currentState = State.FOLLOWER;
currentTerm = 0;
votedFor = null;
log = new ArrayList<>();
commitIndex = 0;
lastApplied = 0;
nextIndex = new ConcurrentHashMap<>();
matchIndex = new ConcurrentHashMap<>();
votesReceived = new HashSet<>();
currentLeader = null;
// Add a dummy entry at index 0 for easier indexing
log.add(new LogEntry(0, "INIT", 0));
}
@Override
public void onServerStart() {
// Initialize election timeout and start
resetElectionTimeout();
raftLog("Raft node initialized as FOLLOWER");
scheduleElectionTimeout();
}
@Override
public void onServerReset() {
onServerInit();
removeSchedules();
}
@Override
public void onServerRecv(VSMessage message) {
String msgType = message.getString("type");
int term = message.getInteger("term");
int senderId = message.getSendingProcess().getProcessNum();
// If we receive a message with a higher term, become follower
if (term > currentTerm) {
currentTerm = term;
votedFor = null;
if (currentState != State.FOLLOWER) {
becomeFollower();
}
}
switch (msgType) {
case MSG_REQUEST_VOTE:
handleRequestVote(message, senderId);
break;
case MSG_VOTE_RESPONSE:
handleVoteResponse(message, senderId);
break;
case MSG_APPEND_ENTRIES:
handleAppendEntries(message, senderId);
break;
case MSG_APPEND_RESPONSE:
handleAppendResponse(message, senderId);
break;
case MSG_CLIENT_REQUEST:
handleClientRequest(message, senderId);
break;
}
}
@Override
public void onServerSchedule() {
long currentTime = process.getTime();
switch (currentState) {
case FOLLOWER:
case CANDIDATE:
// Check election timeout
if (currentTime >= electionTimeout) {
startElection();
}
break;
case LEADER:
// Send heartbeats
sendHeartbeats();
scheduleAt(currentTime + HEARTBEAT_INTERVAL);
break;
}
}
@Override
public void onClientInit() {
// Initialize client state
clientHasScheduled = false;
clientRequestCount = 0;
}
@Override
public void onClientStart() {
// This method is never called when using HAS_ON_SERVER_START
// Clients will send requests in response to server heartbeats instead
}
@Override
public void onClientReset() {
removeSchedules();
clientHasScheduled = false;
clientRequestCount = 0;
}
@Override
public void onClientRecv(VSMessage message) {
// Clients can receive responses to their requests
String msgType = message.getString("type");
if ("CLIENT_RESPONSE".equals(msgType)) {
boolean success = message.getBoolean("success");
String result = message.getString("result");
raftLog("Client received response: success=" + success + ", result=" + result);
} else if (MSG_APPEND_ENTRIES.equals(msgType)) {
// Client receives heartbeat from leader - good time to send a request
if (!clientHasScheduled) {
clientHasScheduled = true;
// Schedule first client request after a short delay
scheduleAt(process.getTime() + 100);
}
}
}
@Override
public void onClientSchedule() {
// Send a test client request
VSMessage request = new VSMessage();
request.setString("type", MSG_CLIENT_REQUEST);
request.setString("command", "SET x=" + process.getRandomPercentage());
request.setLong("clientId", process.getProcessNum());
request.setLong("requestId", System.currentTimeMillis());
sendMessage(request);
raftLog("Client sent request #" + clientRequestCount + ": " + request.getString("command"));
// Update request count
clientRequestCount++;
// Schedule next request after a delay
if (clientRequestCount < 10) { // Limit number of requests for testing
scheduleAt(process.getTime() + 1000 + process.getRandomPercentage() * 10);
}
}
// --- Raft Algorithm Implementation ---
private void startElection() {
currentState = State.CANDIDATE;
currentTerm++;
votedFor = process.getProcessNum();
votesReceived.clear();
votesReceived.add(process.getProcessNum()); // Vote for self
raftLog("Starting election for term " + currentTerm);
// Send RequestVote to all other servers
VSMessage voteRequest = new VSMessage();
voteRequest.setString("type", MSG_REQUEST_VOTE);
voteRequest.setInteger("term", currentTerm);
voteRequest.setInteger("candidateId", process.getProcessNum());
voteRequest.setInteger("lastLogIndex", log.size() - 1);
voteRequest.setInteger("lastLogTerm", log.get(log.size() - 1).term);
sendMessage(voteRequest);
// Reset election timeout
resetElectionTimeout();
scheduleElectionTimeout();
}
private void handleRequestVote(VSMessage message, int candidateId) {
int term = message.getInteger("term");
int lastLogIndex = message.getInteger("lastLogIndex");
int lastLogTerm = message.getInteger("lastLogTerm");
boolean voteGranted = false;
// Grant vote if:
// 1. We haven't voted in this term or voted for this candidate
// 2. Candidate's log is at least as up-to-date as ours
if (term >= currentTerm &&
(votedFor == null || votedFor == candidateId) &&
isLogUpToDate(lastLogIndex, lastLogTerm)) {
votedFor = candidateId;
voteGranted = true;
resetElectionTimeout();
raftLog("Voted for candidate " + candidateId + " in term " + term);
}
// Send vote response
VSMessage response = new VSMessage();
response.setString("type", MSG_VOTE_RESPONSE);
response.setInteger("term", currentTerm);
response.setBoolean("voteGranted", voteGranted);
response.setInteger("senderId", process.getProcessNum());
// Send directly to candidate
response.setInteger("receiverNum", candidateId);
sendMessage(response);
}
private void handleVoteResponse(VSMessage message, int senderId) {
if (currentState != State.CANDIDATE) {
return;
}
boolean voteGranted = message.getBoolean("voteGranted");
if (voteGranted) {
votesReceived.add(senderId);
raftLog("Received vote from " + senderId + " (total: " + votesReceived.size() + ")");
// Check if we have majority
int majority = (getNumProcesses() / 2) + 1;
if (votesReceived.size() >= majority) {
becomeLeader();
}
}
}
private void becomeLeader() {
currentState = State.LEADER;
currentLeader = process.getProcessNum();
raftLog("Became LEADER for term " + currentTerm);
// Initialize leader state
nextIndex.clear();
matchIndex.clear();
for (int i = 0; i < getNumProcesses(); i++) {
if (i != process.getProcessNum()) {
nextIndex.put(i, log.size());
matchIndex.put(i, 0);
}
}
// Send initial heartbeats immediately
sendHeartbeats();
// Schedule regular heartbeats
removeSchedules();
scheduleAt(process.getTime() + HEARTBEAT_INTERVAL);
// Highlight the leader visually
if (process instanceof VSInternalProcess) {
((VSInternalProcess) process).highlightOn();
}
}
private void becomeFollower() {
currentState = State.FOLLOWER;
raftLog("Became FOLLOWER for term " + currentTerm);
// Remove leader highlighting
if (process instanceof VSInternalProcess) {
((VSInternalProcess) process).highlightOff();
}
// Reset election timeout
resetElectionTimeout();
scheduleElectionTimeout();
}
private void sendHeartbeats() {
for (int i = 0; i < getNumProcesses(); i++) {
if (i != process.getProcessNum()) {
sendAppendEntries(i);
}
}
}
private void sendAppendEntries(int followerId) {
int nextIdx = nextIndex.getOrDefault(followerId, 1);
int prevLogIndex = nextIdx - 1;
int prevLogTerm = prevLogIndex >= 0 && prevLogIndex < log.size() ? log.get(prevLogIndex).term : 0;
VSMessage appendEntries = new VSMessage();
appendEntries.setString("type", MSG_APPEND_ENTRIES);
appendEntries.setInteger("term", currentTerm);
appendEntries.setInteger("leaderId", process.getProcessNum());
appendEntries.setInteger("prevLogIndex", prevLogIndex);
appendEntries.setInteger("prevLogTerm", prevLogTerm);
appendEntries.setInteger("leaderCommit", commitIndex);
// Include log entries if needed
List<LogEntry> entries = new ArrayList<>();
for (int i = nextIdx; i < log.size(); i++) {
entries.add(log.get(i));
}
// For simplicity, we'll send entry count and details separately
appendEntries.setInteger("entryCount", entries.size());
for (int i = 0; i < entries.size(); i++) {
LogEntry entry = entries.get(i);
appendEntries.setInteger("entry_" + i + "_term", entry.term);
appendEntries.setString("entry_" + i + "_cmd", entry.command);
appendEntries.setLong("entry_" + i + "_time", entry.timestamp);
}
appendEntries.setInteger("receiverNum", followerId);
sendMessage(appendEntries);
}
private void handleAppendEntries(VSMessage message, int leaderId) {
int term = message.getInteger("term");
int prevLogIndex = message.getInteger("prevLogIndex");
int prevLogTerm = message.getInteger("prevLogTerm");
int leaderCommit = message.getInteger("leaderCommit");
// Reset election timeout when we hear from leader
resetElectionTimeout();
lastHeartbeat = process.getTime();
currentLeader = leaderId;
boolean success = false;
// Check if log matches at prevLogIndex
if (prevLogIndex == 0 ||
(prevLogIndex < log.size() && log.get(prevLogIndex).term == prevLogTerm)) {
success = true;
// Remove conflicting entries
if (prevLogIndex + 1 < log.size()) {
log.subList(prevLogIndex + 1, log.size()).clear();
}
// Append new entries
int entryCount = message.getInteger("entryCount");
for (int i = 0; i < entryCount; i++) {
int entryTerm = message.getInteger("entry_" + i + "_term");
String entryCmd = message.getString("entry_" + i + "_cmd");
long entryTime = message.getLong("entry_" + i + "_time");
log.add(new LogEntry(entryTerm, entryCmd, entryTime));
raftLog("Appended log entry: " + entryCmd);
}
// Update commit index
if (leaderCommit > commitIndex) {
commitIndex = Math.min(leaderCommit, log.size() - 1);
applyStateMachine();
}
}
// Send response
VSMessage response = new VSMessage();
response.setString("type", MSG_APPEND_RESPONSE);
response.setInteger("term", currentTerm);
response.setBoolean("success", success);
response.setInteger("senderId", process.getProcessNum());
response.setInteger("matchIndex", log.size() - 1);
response.setInteger("receiverNum", leaderId);
sendMessage(response);
}
private void handleAppendResponse(VSMessage message, int followerId) {
if (currentState != State.LEADER) {
return;
}
boolean success = message.getBoolean("success");
int matchIdx = message.getInteger("matchIndex");
if (success) {
matchIndex.put(followerId, matchIdx);
nextIndex.put(followerId, matchIdx + 1);
// Check if we can advance commit index
updateCommitIndex();
} else {
// Decrement nextIndex and retry
int next = nextIndex.getOrDefault(followerId, 1);
if (next > 1) {
nextIndex.put(followerId, next - 1);
}
}
}
private void handleClientRequest(VSMessage message, int clientId) {
if (currentState != State.LEADER) {
// Redirect to leader or reject
VSMessage response = new VSMessage();
response.setString("type", "CLIENT_RESPONSE");
response.setBoolean("success", false);
response.setString("result", "Not leader. Leader is: " + currentLeader);
response.setInteger("receiverNum", clientId);
sendMessage(response);
return;
}
// Append to log
String command = message.getString("command");
LogEntry entry = new LogEntry(currentTerm, command, process.getTime());
log.add(entry);
raftLog("Leader received client request: " + command);
// Will be committed when replicated to majority
// For now, send optimistic response
VSMessage response = new VSMessage();
response.setString("type", "CLIENT_RESPONSE");
response.setBoolean("success", true);
response.setString("result", "Command logged: " + command);
response.setInteger("receiverNum", clientId);
sendMessage(response);
}
// --- Helper Methods ---
private boolean isLogUpToDate(int lastLogIndex, int lastLogTerm) {
int ourLastIndex = log.size() - 1;
int ourLastTerm = log.get(ourLastIndex).term;
return lastLogTerm > ourLastTerm ||
(lastLogTerm == ourLastTerm && lastLogIndex >= ourLastIndex);
}
private void resetElectionTimeout() {
if (process != null) {
long timeout = ELECTION_TIMEOUT_MIN +
(long)(Math.random() * (ELECTION_TIMEOUT_MAX - ELECTION_TIMEOUT_MIN));
electionTimeout = process.getTime() + timeout;
}
}
private void scheduleElectionTimeout() {
removeSchedules();
scheduleAt(electionTimeout);
}
private void updateCommitIndex() {
// Find the highest index that has been replicated to majority
for (int n = log.size() - 1; n > commitIndex; n--) {
if (log.get(n).term == currentTerm) {
int replicatedCount = 1; // Leader has it
for (int matchIdx : matchIndex.values()) {
if (matchIdx >= n) {
replicatedCount++;
}
}
if (replicatedCount > getNumProcesses() / 2) {
commitIndex = n;
applyStateMachine();
break;
}
}
}
}
private void applyStateMachine() {
while (lastApplied < commitIndex) {
lastApplied++;
LogEntry entry = log.get(lastApplied);
raftLog("Applied to state machine: " + entry.command);
}
}
private void raftLog(String message) {
String stateStr = currentState != null ? currentState.toString() : "CLIENT";
String prefix = String.format("[%s T:%d N:%d] ",
stateStr, currentTerm, process.getProcessNum());
process.log(prefix + message);
}
@Override
public String toString() {
return super.toString() + " - Raft Consensus";
}
}
|