diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-27 13:05:59 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-27 13:05:59 +0200 |
| commit | f55d67d98dddf5861dc4266564863dde4b0b6ed1 (patch) | |
| tree | b87ad04a444b9873cb9ec0ede643772f160e6012 /src/main | |
| parent | 29252fbc7803e4a660332524d5811ff942e2b0bc (diff) | |
sr: retune Raft replay for faster reelection
Diffstat (limited to 'src/main')
10 files changed, 276 insertions, 17 deletions
diff --git a/src/main/java/events/VSRegisteredEvents.java b/src/main/java/events/VSRegisteredEvents.java index 8d31b0e..2ce24ea 100644 --- a/src/main/java/events/VSRegisteredEvents.java +++ b/src/main/java/events/VSRegisteredEvents.java @@ -88,6 +88,8 @@ public final class VSRegisteredEvents { registerEvent("events.implementations.VSVectorTimestampEvent"); registerEvent("events.implementations.VSTimestampMonitorEvent"); registerEvent("events.implementations.VSTimestampTriggeredEvent"); + registerEvent("events.internal.VSProtocolEvent"); + registerEvent("events.internal.VSProtocolScheduleEvent"); registerEvent("protocols.implementations.VSBasicMulticastProtocol"); registerEvent("protocols.implementations.VSBerkelyTimeProtocol"); registerEvent("protocols.implementations.VSBroadcastProtocol"); diff --git a/src/main/java/events/internal/VSProtocolEvent.java b/src/main/java/events/internal/VSProtocolEvent.java index 5cbe6df..41adc3c 100644 --- a/src/main/java/events/internal/VSProtocolEvent.java +++ b/src/main/java/events/internal/VSProtocolEvent.java @@ -3,6 +3,8 @@ package events.internal; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; import core.VSInternalProcess; import core.VSTask; @@ -43,6 +45,9 @@ public class VSProtocolEvent extends VSAbstractInternalEvent /** The event is a protocol activation if true. Else it is a deactivation */ private boolean isProtocolActivation; + /** Optional long preference overrides applied when the protocol starts. */ + private HashMap<String, Long> longOverrides; + /* (non-Javadoc) * @see events.VSCopyableEvent#initCopy(events.VSAbstractEvent) */ @@ -51,6 +56,10 @@ public class VSProtocolEvent extends VSAbstractInternalEvent protocolEventCopy.isClientProtocol(isClientProtocol); protocolEventCopy.isProtocolActivation(isProtocolActivation); protocolEventCopy.setProtocolClassname(protocolClassname); + if (longOverrides != null) { + protocolEventCopy.longOverrides = + new HashMap<String, Long>(longOverrides); + } } /* (non-Javadoc) @@ -108,6 +117,20 @@ public class VSProtocolEvent extends VSAbstractInternalEvent public void setProtocolClassname(String protocolClassname) { this.protocolClassname = protocolClassname; } + + /** + * Overrides a long preference before the protocol starts. + * + * @param key the preference key + * @param value the value to apply + */ + public void setLongOverride(String key, long value) { + if (longOverrides == null) { + longOverrides = new HashMap<String, Long>(); + } + + longOverrides.put(key, Long.valueOf(value)); + } /** * Checks if we should schedule a protocol start task. @@ -158,6 +181,8 @@ public class VSProtocolEvent extends VSAbstractInternalEvent VSAbstractProtocol protocol = internalProcess.getProtocolObject(protocolClassname); + applyLongOverrides(protocol); + if (isClientProtocol) protocol.isClient(isProtocolActivation); else @@ -215,7 +240,9 @@ public class VSProtocolEvent extends VSAbstractInternalEvent objectOutputStream.writeObject(Boolean.valueOf(isProtocolActivation)); /** For later backwards compatibility, to add more stuff */ - objectOutputStream.writeObject(Boolean.valueOf(false)); + objectOutputStream.writeObject(longOverrides == null + ? Boolean.valueOf(false) + : new HashMap<String, Long>(longOverrides)); } /* (non-Javadoc) @@ -242,7 +269,17 @@ public class VSProtocolEvent extends VSAbstractInternalEvent this.setShortname(createShortname(null)); /** For later backwards compatibility, to add more stuff */ - objectInputStream.readObject(); + Object overrides = objectInputStream.readObject(); + if (overrides instanceof Map<?, ?> map) { + longOverrides = new HashMap<String, Long>(); + for (Map.Entry<?, ?> entry : map.entrySet()) { + if (entry.getKey() instanceof String + && entry.getValue() instanceof Long) { + longOverrides.put((String) entry.getKey(), + (Long) entry.getValue()); + } + } + } } protected String createShortname(String savedShortname) { @@ -266,4 +303,14 @@ public class VSProtocolEvent extends VSAbstractInternalEvent prefs.getString("lang.deactivated"); return protocolShortname + " " + clientServer + " " + activateDeactivate; } + + private void applyLongOverrides(VSAbstractProtocol protocol) { + if (longOverrides == null || longOverrides.isEmpty()) { + return; + } + + for (Map.Entry<String, Long> entry : longOverrides.entrySet()) { + protocol.setLong(entry.getKey(), entry.getValue().longValue()); + } + } } diff --git a/src/main/java/events/internal/VSProtocolScheduleEvent.java b/src/main/java/events/internal/VSProtocolScheduleEvent.java index c940212..e01b3da 100644 --- a/src/main/java/events/internal/VSProtocolScheduleEvent.java +++ b/src/main/java/events/internal/VSProtocolScheduleEvent.java @@ -3,7 +3,12 @@ package events.internal; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Field; +import java.util.ArrayList; +import core.VSAbstractProcess; +import core.VSInternalProcess; +import events.VSRegisteredEvents; import protocols.VSAbstractProtocol; import serialize.VSNotSerializable; import serialize.VSSerialize; @@ -24,6 +29,12 @@ public class VSProtocolScheduleEvent extends VSAbstractInternalEvent private VSAbstractProtocol protocol; /** + * Create a VSProtocolScheduleEvent object for deserialization. + */ + public VSProtocolScheduleEvent() { + } + + /** * Create a VSProtocolScheduleEvent object * * @param protocol the protocol @@ -41,6 +52,7 @@ public class VSProtocolScheduleEvent extends VSAbstractInternalEvent */ public void onInit() { setClassname(getClass().toString()); + setShortname(createShortname(null)); } /** @@ -85,6 +97,14 @@ public class VSProtocolScheduleEvent extends VSAbstractInternalEvent * @see events.VSAbstractEvent#onStart() */ public void onStart() { + if (protocol == null) { + protocol = resolveProtocolFromProcess(); + } + + if (protocol == null) { + return; + } + if (isServerSchedule) protocol.onServerScheduleStart(); else @@ -122,5 +142,82 @@ public class VSProtocolScheduleEvent extends VSAbstractInternalEvent /** For later backwards compatibility, to add more stuff */ objectInputStream.readObject(); + /** For later backwards compatibility, to add more stuff */ + objectInputStream.readObject(); + + } + + protected String createShortname(String savedShortname) { + if (prefs == null) { + return savedShortname != null + ? savedShortname + : "Protocol Schedule"; + } + + if (protocol == null || protocol.getClassname() == null) { + return prefs.getString("lang.events.internal.VSProtocolScheduleEvent.short"); + } + + String protocolShortname = + VSRegisteredEvents.getShortnameByClassname(protocol.getClassname()); + if (protocolShortname == null) + protocolShortname = protocol.getClassname(); + + return protocolShortname + " " + + (isServerSchedule + ? prefs.getString("lang.server") + : prefs.getString("lang.client")) + + " " + + prefs.getString("lang.events.internal.VSProtocolScheduleEvent.short"); + } + + @SuppressWarnings("unchecked") + private VSAbstractProtocol resolveProtocolFromProcess() { + if (!(process instanceof VSInternalProcess internalProcess)) { + return null; + } + + try { + VSAbstractProtocol raftProtocol = + internalProcess.getProtocolObject( + "protocols.implementations.VSRaftProtocol"); + if (raftProtocol != null) { + return raftProtocol; + } + + Field field = VSAbstractProcess.class.getDeclaredField("protocolsToReset"); + field.setAccessible(true); + + ArrayList<VSAbstractProtocol> protocols = + (ArrayList<VSAbstractProtocol>) field.get(internalProcess); + if (protocols == null || protocols.isEmpty()) { + return null; + } + + VSAbstractProtocol activeProtocol = null; + for (VSAbstractProtocol candidate : protocols) { + if (candidate == null) { + continue; + } + + if ("protocols.implementations.VSRaftProtocol".equals( + candidate.getClassname())) { + return candidate; + } + + if (activeProtocol == null && + (candidate.isServer() || candidate.isClient())) { + activeProtocol = candidate; + } + } + + if (activeProtocol != null) { + return activeProtocol; + } + + return protocols.get(0); + } catch (ReflectiveOperationException e) { + return null; + } } } diff --git a/src/main/java/prefs/VSDefaultPrefs.java b/src/main/java/prefs/VSDefaultPrefs.java index aa0b741..ee8e2bf 100644 --- a/src/main/java/prefs/VSDefaultPrefs.java +++ b/src/main/java/prefs/VSDefaultPrefs.java @@ -160,6 +160,10 @@ public class VSDefaultPrefs extends VSSerializablePrefs { initString("lang.events.implementations.VSProcessCrashEvent.short", "Process Crash"); initString("lang.events.implementations.VSProcessRecoverEvent", "Process Recover Event"); initString("lang.events.implementations.VSProcessRecoverEvent.short", "Process Recover"); + initString("lang.events.internal.VSProtocolEvent", "Protocol Event"); + initString("lang.events.internal.VSProtocolEvent.short", "Protocol Event"); + initString("lang.events.internal.VSProtocolScheduleEvent", "Protocol Schedule Event"); + initString("lang.events.internal.VSProtocolScheduleEvent.short", "Protocol Schedule"); initString("lang.protocols.implementations.VSBasicMulticastProtocol", "Basic Multicast Protocol"); initString("lang.protocols.implementations.VSBasicMulticastProtocol.short", "Basic Multicast"); initString("lang.protocols.implementations.VSBerkelyTimeProtocol", "Berkley algorithm for internal sync."); diff --git a/src/main/java/protocols/VSAbstractProtocol.java b/src/main/java/protocols/VSAbstractProtocol.java index 1695c25..da12d31 100644 --- a/src/main/java/protocols/VSAbstractProtocol.java +++ b/src/main/java/protocols/VSAbstractProtocol.java @@ -333,6 +333,7 @@ abstract public class VSAbstractProtocol extends VSAbstractEvent { VSInternalProcess internalProcess = (VSInternalProcess) process; VSAbstractEvent scheduleEvent = new VSProtocolScheduleEvent(this, currentContextIsServer); + scheduleEvent.init(internalProcess); VSTask scheduleTask = new VSTask(time, internalProcess, scheduleEvent, VSTask.LOCAL); diff --git a/src/main/java/protocols/implementations/VSRaftProtocol.java b/src/main/java/protocols/implementations/VSRaftProtocol.java index 3741e54..c75628d 100644 --- a/src/main/java/protocols/implementations/VSRaftProtocol.java +++ b/src/main/java/protocols/implementations/VSRaftProtocol.java @@ -177,6 +177,7 @@ public class VSRaftProtocol extends VSAbstractProtocol { leaderId = process.getProcessID(); lastHeartbeatTime = process.getTime(); isServer(true); + log("Leader elected: process " + leaderId + " (term " + currentTerm + ")"); if (!getLongKeySet().contains("heartbeatInterval")) { onServerInit(); diff --git a/src/main/java/simulator/builder/SimulationBuilder.java b/src/main/java/simulator/builder/SimulationBuilder.java index c35f0ea..cf5f962 100644 --- a/src/main/java/simulator/builder/SimulationBuilder.java +++ b/src/main/java/simulator/builder/SimulationBuilder.java @@ -20,7 +20,7 @@ import java.util.ArrayList; * .withProcesses(5) * .withProtocol("protocols.implementations.VSRaftProtocol") * .activateServers(0, 1, 2) - * .activateClients(3, 4) + * .activateClientsAt(1000, 3, 4) * .addCrashEvent(0, 2000) * .addRecoveryEvent(0, 3000) * .save("saved-simulations/my-raft.dat"); @@ -35,6 +35,7 @@ public class SimulationBuilder { private String protocolClass; private int numProcesses = 3; // default private List<ScheduledTask> scheduledTasks = new ArrayList<>(); + private Map<Integer, Map<String, Long>> protocolLongOverrides = new HashMap<>(); private int simulationDuration = 10000; // default 10 seconds /** @@ -99,6 +100,7 @@ public class SimulationBuilder { event.onInit(); // Initialize the event first setProtocolClassname(event, protocolClass); setIsServer(event, true); + attachProtocolOverrides(event, pid); scheduledTasks.add(new ScheduledTask(0, pid, event, true)); } @@ -109,18 +111,19 @@ public class SimulationBuilder { * Activate protocol as client on specified processes */ public SimulationBuilder activateClients(int... processIds) { - return activateClients(500, processIds); // default delay + return activateClientsAt(500L, processIds); // default delay } /** * Activate protocol as client on specified processes with custom start time */ - public SimulationBuilder activateClients(long startTime, int... processIds) { + public SimulationBuilder activateClientsAt(long startTime, int... processIds) { for (int i = 0; i < processIds.length; i++) { VSProtocolEvent event = new VSProtocolEvent(); event.onInit(); // Initialize the event first setProtocolClassname(event, protocolClass); setIsServer(event, false); + attachProtocolOverrides(event, processIds[i]); // Stagger client starts long time = startTime + (i * 200); @@ -128,6 +131,29 @@ public class SimulationBuilder { } return this; } + + /** + * Override a long preference on the protocol instance for a given process. + * + * @param processId the process index in the simulation + * @param key the protocol preference key + * @param value the value to apply + * @return this builder + */ + public SimulationBuilder setProtocolLong(int processId, String key, long value) { + protocolLongOverrides + .computeIfAbsent(Integer.valueOf(processId), pid -> new HashMap<>()) + .put(key, Long.valueOf(value)); + + for (ScheduledTask scheduledTask : scheduledTasks) { + if (scheduledTask.processId == processId && + scheduledTask.event instanceof VSProtocolEvent protocolEvent) { + protocolEvent.setLongOverride(key, value); + } + } + + return this; + } /** * Add a process crash event @@ -257,6 +283,12 @@ public class SimulationBuilder { // Initialize all events with their processes for (ScheduledTask st : scheduledTasks) { VSInternalProcess process = visualization.getProcess(st.processId); + if (process == null) { + throw new IllegalStateException( + "No process " + st.processId + " exists for " + + st.event.getClass().getSimpleName() + " at time " + + st.time); + } st.event.init(process); // For protocol events, update the shortname after init @@ -279,6 +311,20 @@ public class SimulationBuilder { taskManager.addTask(task); } } + + /** + * Apply any stored protocol overrides to a protocol activation event. + */ + private void attachProtocolOverrides(VSProtocolEvent event, int processId) { + Map<String, Long> longOverrides = protocolLongOverrides.get(Integer.valueOf(processId)); + if (longOverrides == null || longOverrides.isEmpty()) { + return; + } + + for (Map.Entry<String, Long> entry : longOverrides.entrySet()) { + event.setLongOverride(entry.getKey(), entry.getValue().longValue()); + } + } /** * Save the simulation to a file @@ -357,5 +403,6 @@ public class SimulationBuilder { public static final String ONE_PHASE_COMMIT = "protocols.implementations.VSOnePhaseCommitProtocol"; public static final String TWO_PHASE_COMMIT = "protocols.implementations.VSTwoPhaseCommitProtocol"; public static final String RELIABLE_MULTICAST = "protocols.implementations.VSReliableMulticastProtocol"; + public static final String RAFT = "protocols.implementations.VSRaftProtocol"; } -}
\ No newline at end of file +} diff --git a/src/main/java/simulator/builder/SimulationFactory.java b/src/main/java/simulator/builder/SimulationFactory.java index 2bd73b9..48ec638 100644 --- a/src/main/java/simulator/builder/SimulationFactory.java +++ b/src/main/java/simulator/builder/SimulationFactory.java @@ -51,7 +51,7 @@ public class SimulationFactory { .withProtocol(SimulationBuilder.Protocols.TWO_PHASE_COMMIT) .withDuration(10000) .activateServers(0) // Process 0 is coordinator - .activateClients(300, IntStream.range(1, numParticipants + 1).toArray()); + .activateClientsAt(300, IntStream.range(1, numParticipants + 1).toArray()); } /** @@ -80,4 +80,27 @@ public class SimulationFactory { .activateServers(0) // First process broadcasts .activateClients(IntStream.range(1, numProcesses).toArray()); } -}
\ No newline at end of file + + /** + * Create a Raft simulation with a leader crash and staggered follower + * activation so the election deadlines do not stay perfectly aligned. + * + * @return configured Raft simulation builder + */ + public static SimulationBuilder createRaftSimulation() throws Exception { + return new SimulationBuilder() + .withProcesses(3) + .withProtocol(SimulationBuilder.Protocols.RAFT) + .withDuration(30000) + .activateServers(0) + .activateClientsAt(100, 1) + .activateClientsAt(1700, 2) + // Bias process 1 toward a fast, clean post-crash election while + // keeping process 2's timeout comfortably behind it. + .setProtocolLong(1, "electionTimeout", 4000) + .setProtocolLong(1, "electionJitter", 0) + .setProtocolLong(2, "electionTimeout", 9000) + .setProtocolLong(2, "electionJitter", 0) + .addCrashEvent(0, 3500); + } +} diff --git a/src/main/java/testing/HeadlessLoader.java b/src/main/java/testing/HeadlessLoader.java index a19ec19..de18a93 100644 --- a/src/main/java/testing/HeadlessLoader.java +++ b/src/main/java/testing/HeadlessLoader.java @@ -47,32 +47,32 @@ public class HeadlessLoader { // Copy non-string values from serialized prefs for (String key : serializedPrefs.getIntegerKeySet()) { if (!key.startsWith("lang.")) { - newPrefs.initInteger(key, serializedPrefs.getInteger(key)); + newPrefs.setInteger(key, serializedPrefs.getInteger(key)); } } for (String key : serializedPrefs.getBooleanKeySet()) { if (!key.startsWith("lang.")) { - newPrefs.initBoolean(key, serializedPrefs.getBoolean(key)); + newPrefs.setBoolean(key, serializedPrefs.getBoolean(key)); } } for (String key : serializedPrefs.getFloatKeySet()) { if (!key.startsWith("lang.")) { - newPrefs.initFloat(key, serializedPrefs.getFloat(key)); + newPrefs.setFloat(key, serializedPrefs.getFloat(key)); } } for (String key : serializedPrefs.getColorKeySet()) { if (!key.startsWith("lang.")) { - newPrefs.initColor(key, serializedPrefs.getColor(key)); + newPrefs.setColor(key, serializedPrefs.getColor(key)); } } for (String key : serializedPrefs.getVectorKeySet()) { if (!key.startsWith("lang.")) { - newPrefs.initVector(key, serializedPrefs.getVector(key)); + newPrefs.setVector(key, serializedPrefs.getVector(key)); } } for (String key : serializedPrefs.getLongKeySet()) { if (!key.startsWith("lang.")) { - newPrefs.initLong(key, serializedPrefs.getLong(key)); + newPrefs.setLong(key, serializedPrefs.getLong(key)); } } @@ -86,6 +86,41 @@ public class HeadlessLoader { // Deserialize simulator simulator.deserialize(serializer, objectInputStream); objectInputStream.close(); + + // Reapply non-localized prefs after deserialization in case the + // simulator reset any persisted numeric values while restoring state. + VSPrefs simulatorPrefs = simulator.getPrefs(); + for (String key : serializedPrefs.getIntegerKeySet()) { + if (!key.startsWith("lang.")) { + simulatorPrefs.setInteger(key, serializedPrefs.getInteger(key)); + } + } + for (String key : serializedPrefs.getBooleanKeySet()) { + if (!key.startsWith("lang.")) { + simulatorPrefs.setBoolean(key, serializedPrefs.getBoolean(key)); + } + } + for (String key : serializedPrefs.getFloatKeySet()) { + if (!key.startsWith("lang.")) { + simulatorPrefs.setFloat(key, serializedPrefs.getFloat(key)); + } + } + for (String key : serializedPrefs.getColorKeySet()) { + if (!key.startsWith("lang.")) { + simulatorPrefs.setColor(key, serializedPrefs.getColor(key)); + } + } + for (String key : serializedPrefs.getVectorKeySet()) { + if (!key.startsWith("lang.")) { + simulatorPrefs.setVector(key, serializedPrefs.getVector(key)); + } + } + for (String key : serializedPrefs.getLongKeySet()) { + if (!key.startsWith("lang.")) { + simulatorPrefs.setLong(key, serializedPrefs.getLong(key)); + } + } + simulator.updateFromPrefs(); // Get the visualization using reflection Field vizField = VSSimulator.class.getDeclaredField("simulatorVisualization"); @@ -140,4 +175,4 @@ public class HeadlessLoader { return visualization; } } -}
\ No newline at end of file +} diff --git a/src/main/java/testing/HeadlessSimulationRunner.java b/src/main/java/testing/HeadlessSimulationRunner.java index 6279fa9..5a49487 100644 --- a/src/main/java/testing/HeadlessSimulationRunner.java +++ b/src/main/java/testing/HeadlessSimulationRunner.java @@ -86,7 +86,9 @@ public class HeadlessSimulationRunner { // Get the simulation's configured end time long untilTime = viz.getUntilTime(); - long actualMaxTime = Math.min(maxTime, untilTime); + long prefsUntilTime = simulator.getPrefs().getInteger("sim.seconds") * 1000L; + long actualUntilTime = Math.max(untilTime, prefsUntilTime); + long actualMaxTime = Math.min(maxTime, actualUntilTime); System.out.println("Running simulation for up to " + actualMaxTime + "ms (until time: " + untilTime + "ms)..."); @@ -347,4 +349,4 @@ public class HeadlessSimulationRunner { } } } -}
\ No newline at end of file +} |
