Merge pull request #8783 from GGraziadei/8710-flux-component-conf
Allow per-component configuration in Flux topology
diff --git a/.github/workflows/cypress-tests.yml b/.github/workflows/cypress-tests.yml
index 5f39e7e..26621b3 100644
--- a/.github/workflows/cypress-tests.yml
+++ b/.github/workflows/cypress-tests.yml
@@ -33,7 +33,7 @@
run:
working-directory: storm-webapp
steps:
- - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+ - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3
- name: Set up Node.js
uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0
diff --git a/.github/workflows/maven.yaml b/.github/workflows/maven.yaml
index 98666df..db1753c 100644
--- a/.github/workflows/maven.yaml
+++ b/.github/workflows/maven.yaml
@@ -27,7 +27,7 @@
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+ - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3
- uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
with:
path: ~/.m2/repository
@@ -71,7 +71,7 @@
experimental: [false]
fail-fast: false
steps:
- - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+ - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3
- uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
with:
path: ~/.m2/repository
diff --git a/.github/workflows/nightlies.yaml b/.github/workflows/nightlies.yaml
index f4ffcda..505c702 100644
--- a/.github/workflows/nightlies.yaml
+++ b/.github/workflows/nightlies.yaml
@@ -29,7 +29,7 @@
name: Upload to Nightly Builds
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+ - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3
- uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
with:
path: ~/.m2/repository
diff --git a/.github/workflows/snapshots.yaml b/.github/workflows/snapshots.yaml
index 37ebf40..73f514f 100644
--- a/.github/workflows/snapshots.yaml
+++ b/.github/workflows/snapshots.yaml
@@ -29,7 +29,7 @@
name: Publish Snapshots
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+ - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3
- uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
with:
path: ~/.m2/repository
diff --git a/.github/workflows/update-license-files.yml b/.github/workflows/update-license-files.yml
index 24a11b1..555bc61 100644
--- a/.github/workflows/update-license-files.yml
+++ b/.github/workflows/update-license-files.yml
@@ -28,7 +28,7 @@
update-license-files:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+ - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3
- uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
with:
diff --git a/AGENTS.md b/AGENTS.md
new file mode 100644
index 0000000..c6a5b2f
--- /dev/null
+++ b/AGENTS.md
@@ -0,0 +1,3 @@
+## Security
+
+See [Security Model](https://storm.apache.org/security-model.html) - the linked security model for the project's threat model, in-scope / out-of-scope declarations, and known non-findings before reporting issues.
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index d5d6bb4..6f55bae 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -94,6 +94,9 @@
nimbus.credential.renewers.freq.secs: 600
nimbus.queue.size: 100000
scheduler.display.resource: false
+nimbus.even.rebalance.idle.supervisor.enabled: false
+nimbus.even.rebalance.max.free.per.topology: 0
+nimbus.even.rebalance.idle.supervisor.min.stable.rounds: 3
nimbus.local.assignments.backend.class: "org.apache.storm.assignments.InMemoryAssignmentBackend"
nimbus.assignments.service.threads: 10
nimbus.assignments.service.thread.queue.size: 100
diff --git a/pom.xml b/pom.xml
index 5926042..0724796 100644
--- a/pom.xml
+++ b/pom.xml
@@ -227,6 +227,7 @@
<!-- exclude CHANGELOG, VERSION, AND TODO files -->
<exclude>**/CHANGELOG.md</exclude>
<exclude>**/README.md</exclude>
+ <exclude>**/AGENTS.md</exclude>
<exclude>**/README.markdown</exclude>
<exclude>**/DEVELOPER.md</exclude>
<exclude>**/RELEASING.md</exclude>
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 15924b1..6dcc2eb 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -176,6 +176,37 @@
public static final String SCHEDULER_DISPLAY_RESOURCE = "scheduler.display.resource";
/**
+ * If true, {@link org.apache.storm.scheduler.EvenScheduler} may move already-assigned workers onto non-blacklisted supervisors
+ * with no slot in use. This lets a freshly returned supervisor pick up workers instead of staying idle. The number of workers
+ * freed per topology in a single scheduling round is capped by {@link #NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY}, so even
+ * distribution is approached gradually rather than rebuilt from scratch.
+ */
+ @IsBoolean
+ public static final String NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED
+ = "nimbus.even.rebalance.idle.supervisor.enabled";
+
+ /**
+ * Optional upper bound on the number of currently-assigned workers a single topology may release in one scheduling round
+ * when the idle-supervisor rebalance defined by {@link #NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED} kicks in. The
+ * default budget already targets an even per-supervisor distribution (idle supervisors absorb roughly {@code numWorkers /
+ * numSupervisors} workers each in one round), capped by the idle side's free slot capacity. Setting this to a positive
+ * value tightens that budget; setting it to {@code 0} or a negative value leaves the even-distribution budget unbounded.
+ */
+ @IsInteger
+ public static final String NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY
+ = "nimbus.even.rebalance.max.free.per.topology";
+
+ /**
+ * Minimum number of consecutive supervisor monitor rounds that a fully-idle supervisor must have been alive before
+ * {@link org.apache.storm.scheduler.EvenScheduler} can relocate workers onto it. A positive value avoids moving workers onto a
+ * supervisor that has only just returned and may still be flapping. Setting this to {@code 0} or a negative value disables the
+ * uptime guard.
+ */
+ @IsInteger
+ public static final String NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS
+ = "nimbus.even.rebalance.idle.supervisor.min.stable.rounds";
+
+ /**
* The directory where storm's health scripts go.
*/
@IsString
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 103eb00..0463c78 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -985,11 +985,20 @@
String id = entry.getKey();
SupervisorInfo info = entry.getValue();
ret.put(id, new SupervisorDetails(id, info.get_server_port(), info.get_hostname(),
- info.get_scheduler_meta(), null, info.get_resources_map()));
+ info.get_scheduler_meta(), null, info.get_resources_map(),
+ supervisorUptimeSecs(info)));
}
return ret;
}
+ private static long supervisorUptimeSecs(SupervisorInfo info) {
+ // An unset uptime maps to 0L (not the Long.MAX_VALUE default the bare SupervisorDetails constructors use) so a
+ // freshly (re)registered supervisor is treated as just-returned and must accrue real uptime before
+ // Cluster#hasMinimumIdleSupervisorStability lets the idle rebalance place workers on it -- the conservative
+ // choice on the production path.
+ return info.is_set_uptime_secs() ? info.get_uptime_secs() : 0L;
+ }
+
/**
* NOTE: this can return false when a topology has just been activated. The topology may still be
* in the STORMS_SUBTREE.
@@ -2273,7 +2282,8 @@
List<SupervisorDetails> superDetails = new ArrayList<>();
for (Entry<String, SupervisorInfo> entry : superInfos.entrySet()) {
SupervisorInfo info = entry.getValue();
- superDetails.add(new SupervisorDetails(entry.getKey(), info.get_meta(), info.get_resources_map()));
+ superDetails.add(new SupervisorDetails(entry.getKey(), info.get_meta(), info.get_resources_map(),
+ supervisorUptimeSecs(info)));
}
// Note that allSlotsAvailableForScheduling
// only uses the supervisor-details. The rest of the arguments
@@ -2306,7 +2316,7 @@
allPorts.removeAll(deadPorts);
}
ret.put(superId, new SupervisorDetails(superId, hostname, info.get_scheduler_meta(),
- allPorts, info.get_resources_map()));
+ allPorts, info.get_resources_map(), supervisorUptimeSecs(info)));
}
return ret;
}
@@ -5526,4 +5536,3 @@
}
}
}
-
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 1a8879d..c01b6fb 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -363,6 +363,42 @@
return desiredNumWorkers > assignedNumWorkers || getUnassignedExecutors(topology).size() > 0;
}
+ /**
+ * Returns true when {@code supervisor} is a stable, non-blacklisted supervisor whose slots are all currently free --
+ * i.e. a returning idle supervisor the {@link EvenScheduler} idle-rebalance pass may relocate workers onto. The check
+ * is binary by design -- a supervisor either has zero used slots or it does not -- so the rebalance never fires for an
+ * "almost balanced" cluster. Stability is gated by {@link #hasMinimumIdleSupervisorStability(SupervisorDetails)} so a
+ * supervisor that has only just returned (and may still be flapping) is held back until it has been up long enough. The
+ * opt-in {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED} flag is checked once by the caller
+ * ({@link EvenScheduler#redistributeOntoIdleSupervisors(Topologies, Cluster)}), not here.
+ */
+ public boolean isIdleSupervisorAvailableForEvenRebalance(SupervisorDetails supervisor) {
+ if (supervisor == null) {
+ return false;
+ }
+ if (isBlackListed(supervisor.getId())) {
+ return false;
+ }
+ if (supervisor.getAllPorts().isEmpty()) {
+ return false;
+ }
+ if (!getUsedPorts(supervisor).isEmpty()) {
+ return false;
+ }
+ return hasMinimumIdleSupervisorStability(supervisor);
+ }
+
+ private boolean hasMinimumIdleSupervisorStability(SupervisorDetails supervisor) {
+ int minStableRounds = ObjectReader.getInt(
+ conf.get(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS), 3);
+ if (minStableRounds <= 0) {
+ return true;
+ }
+ int monitorFrequencySecs = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS), 3);
+ long requiredUptimeSecs = (long) minStableRounds * Math.max(1, monitorFrequencySecs);
+ return supervisor.getUptimeSecs() >= requiredUptimeSecs;
+ }
+
@Override
public boolean needsSchedulingRas(TopologyDetails topology) {
return getUnassignedExecutors(topology).size() > 0;
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
index 81a0ad8..7f6cc9c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
@@ -72,7 +72,16 @@
}
public static void defaultSchedule(Topologies topologies, Cluster cluster) {
+ EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
+ // needsSchedulingTopologies() returns the cluster's full topology set, but this run is scoped to the
+ // topologies passed in: DefaultScheduler.schedule passes the full set (so the guard is a no-op), while
+ // IsolationScheduler delegates only its leftover, non-isolated topologies here. redistributeOntoIdleSupervisors
+ // above acted only on that passed-in set too. Skip topologies outside it so the leftover path never schedules
+ // one the caller excluded -- e.g. a down isolated topology on a reserved host.
+ if (topologies.getById(topology.getId()) == null) {
+ continue;
+ }
List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
Set<ExecutorDetails> allExecutors = topology.getExecutors();
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
index ccc2e34..69d62d6 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
@@ -18,18 +18,23 @@
package org.apache.storm.scheduler;
+import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import org.apache.storm.DaemonConfig;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.collect.Sets;
+import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
@@ -101,6 +106,186 @@
return Utils.reverseMap(executorToSlot);
}
+ /**
+ * Round-robin relocation of currently-assigned workers onto fully-idle supervisors. Each round-robin iteration moves
+ * at most one worker per topology, so multiple topologies share the idle slots and a single returning supervisor ends
+ * up hosting workers from several topologies — preserving the per-supervisor workload diversity that a fresh
+ * cluster has after submission.
+ *
+ * <p>Per-topology cap in one scheduling round is
+ * {@code idleSupervisorCount * floor(numWorkers / nonBlacklistedSupervisorCount)}, further tightened by
+ * {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY} when set to a positive value. Topologies whose
+ * computed cap is zero (typically {@code numWorkers < numSupervisors}) are skipped entirely. The trigger remains
+ * binary — only fires when at least one supervisor has zero used slots — so a near-balanced cluster sees no
+ * movement. {@code numWorkers} here is the topology's <em>declared</em> worker count, so the cap is an upper bound,
+ * not a guarantee: for an under-assigned topology the donor guard in {@link #relocateOneWorkerOntoIdleSlot} — which
+ * never drains a source supervisor below one worker — can keep the actual number of relocations below it.
+ *
+ * <p>Workers are always pulled from the supervisor where this topology has the most workers, and only when that
+ * supervisor would still hold at least one worker afterward. Each pulled worker's executors are placed directly
+ * onto an idle slot, so the subsequent sortSlots / interleave pass cannot drop them back into the just-vacated
+ * slots. Ties between equally loaded source supervisors are resolved by supervisor id, lexicographically.
+ *
+ * <p>Gated by {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED}: when disabled (the default) the
+ * method returns before scanning any supervisor, so a cluster that has not opted in pays no per-scheduling-round cost.
+ *
+ * <p>This is the package-private entry point shared by {@link #scheduleTopologiesEvenly(Topologies, Cluster)} and
+ * {@link DefaultScheduler#defaultSchedule(Topologies, Cluster)}; its visibility is dictated by those callers, not by
+ * tests (which also reach it from the same package).
+ */
+ static void redistributeOntoIdleSupervisors(Topologies topologies, Cluster cluster) {
+ if (!ObjectReader.getBoolean(
+ cluster.getConf().get(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED), false)) {
+ return;
+ }
+ int nonBlacklistedSupervisorCount = 0;
+ int idleSupervisorCount = 0;
+ Deque<WorkerSlot> idleTargets = new ArrayDeque<>();
+ Set<String> idleSupervisorIds = new HashSet<>();
+ List<SupervisorDetails> supervisors = new ArrayList<>(cluster.getSupervisors().values());
+ supervisors.sort(Comparator.comparing(SupervisorDetails::getId));
+ for (SupervisorDetails s : supervisors) {
+ if (cluster.isBlackListed(s.getId())) {
+ continue;
+ }
+ if (s.getAllPorts().isEmpty()) {
+ continue;
+ }
+ nonBlacklistedSupervisorCount++;
+ if (cluster.isIdleSupervisorAvailableForEvenRebalance(s)) {
+ idleSupervisorCount++;
+ idleSupervisorIds.add(s.getId());
+ List<Integer> ports = new ArrayList<>(s.getAllPorts());
+ Collections.sort(ports);
+ for (Integer port : ports) {
+ idleTargets.add(new WorkerSlot(s.getId(), port));
+ }
+ }
+ }
+ if (idleTargets.isEmpty() || nonBlacklistedSupervisorCount == 0 || idleSupervisorCount == 0) {
+ return;
+ }
+
+ int maxFree = ObjectReader.getInt(
+ cluster.getConf().get(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY), 0);
+
+ List<TopologyDetails> orderedTopos = new ArrayList<>();
+ Map<String, Integer> remainingBudget = new HashMap<>();
+ for (TopologyDetails topo : topologies.getTopologies()) {
+ // Skip topologies already present on every idle supervisor -- relocating gains them no workload diversity.
+ // Reuses the idle-supervisor set computed above instead of re-scanning every supervisor for each topology.
+ if (!topologyCanReuseIdleSupervisor(cluster, topo, idleSupervisorIds)) {
+ continue;
+ }
+ int target = (topo.getNumWorkers() / nonBlacklistedSupervisorCount) * idleSupervisorCount;
+ if (target <= 0) {
+ continue;
+ }
+ if (maxFree > 0) {
+ target = Math.min(target, maxFree);
+ }
+ orderedTopos.add(topo);
+ remainingBudget.put(topo.getId(), target);
+ }
+ if (orderedTopos.isEmpty()) {
+ return;
+ }
+ orderedTopos.sort(Comparator.comparing(TopologyDetails::getId));
+
+ int totalRelocated = 0;
+ while (!idleTargets.isEmpty()) {
+ boolean movedThisIteration = false;
+ for (TopologyDetails topo : orderedTopos) {
+ if (idleTargets.isEmpty()) {
+ break;
+ }
+ int remaining = remainingBudget.getOrDefault(topo.getId(), 0);
+ if (remaining <= 0) {
+ continue;
+ }
+ if (relocateOneWorkerOntoIdleSlot(topo, cluster, idleTargets)) {
+ remainingBudget.put(topo.getId(), remaining - 1);
+ totalRelocated++;
+ movedThisIteration = true;
+ } else {
+ remainingBudget.put(topo.getId(), 0);
+ }
+ }
+ if (!movedThisIteration) {
+ break;
+ }
+ }
+ if (totalRelocated > 0) {
+ LOG.info("EvenScheduler: relocated {} worker(s) onto idle supervisor(s) round-robin across {} topologies.",
+ totalRelocated, orderedTopos.size());
+ }
+ }
+
+ /**
+ * Returns true when at least one of the already-identified idle supervisors does not currently host {@code topology}
+ * -- i.e. the topology can gain workload diversity by relocating onto it. This is the per-topology half of the binary
+ * idle-rebalance trigger; it operates on the pre-computed {@code idleSupervisorIds} (each already vetted by
+ * {@link Cluster#isIdleSupervisorAvailableForEvenRebalance(SupervisorDetails)}) so the per-topology loop avoids a full
+ * supervisor rescan.
+ */
+ private static boolean topologyCanReuseIdleSupervisor(Cluster cluster, TopologyDetails topology,
+ Set<String> idleSupervisorIds) {
+ Set<String> nodesUsedByTopology = new HashSet<>();
+ for (WorkerSlot slot : cluster.getUsedSlotsByTopologyId(topology.getId())) {
+ nodesUsedByTopology.add(slot.getNodeId());
+ }
+ for (String idleSupervisorId : idleSupervisorIds) {
+ if (!nodesUsedByTopology.contains(idleSupervisorId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Pulls a single worker from the supervisor where {@code topology} currently has the most workers and reassigns its
+ * executors onto the next idle slot from {@code idleTargets}. Returns false (without consuming an idle target) if
+ * the topology has no eligible source supervisor — namely all of its supervisors host at most one of its workers,
+ * which would otherwise drain that supervisor to zero and turn it into the next round's idle.
+ */
+ private static boolean relocateOneWorkerOntoIdleSlot(TopologyDetails topology, Cluster cluster,
+ Deque<WorkerSlot> idleTargets) {
+ Map<WorkerSlot, List<ExecutorDetails>> slotToExecutors =
+ getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
+ Map<String, List<WorkerSlot>> nodeToSlots = new HashMap<>();
+ for (WorkerSlot slot : slotToExecutors.keySet()) {
+ nodeToSlots.computeIfAbsent(slot.getNodeId(), k -> new ArrayList<>()).add(slot);
+ }
+ List<Map.Entry<String, List<WorkerSlot>>> candidates = new ArrayList<>(nodeToSlots.entrySet());
+ candidates.removeIf(e -> e.getValue().size() < 2);
+ candidates.sort(Comparator
+ .<Map.Entry<String, List<WorkerSlot>>>comparingInt(e -> e.getValue().size())
+ .reversed()
+ .thenComparing(Map.Entry::getKey));
+ if (candidates.isEmpty()) {
+ return false;
+ }
+ List<WorkerSlot> slots = candidates.get(0).getValue();
+ slots.sort(Comparator.comparingInt(WorkerSlot::getPort));
+ WorkerSlot victim = slots.get(slots.size() - 1);
+ Collection<ExecutorDetails> execs = slotToExecutors.get(victim);
+ if (execs == null || execs.isEmpty()) {
+ return false;
+ }
+ if (idleTargets.isEmpty()) {
+ return false;
+ }
+ WorkerSlot target = idleTargets.poll();
+ // freeSlot-then-assign is intentionally ordered and non-atomic. target is a pre-verified fully-idle slot --
+ // idleTargets only holds ports from supervisors that passed Cluster#isIdleSupervisorAvailableForEvenRebalance,
+ // which requires getUsedPorts to be empty -- so assign cannot hit its slot-occupied path and no rollback is
+ // needed. In the near-impossible event assign threw here, the freed executors are picked up by the regular
+ // scheduling pass on the same round.
+ cluster.freeSlot(victim);
+ cluster.assign(target, topology.getId(), execs);
+ return true;
+ }
+
private static Map<ExecutorDetails, WorkerSlot> scheduleTopology(TopologyDetails topology, Cluster cluster) {
List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
Set<ExecutorDetails> allExecutors = topology.getExecutors();
@@ -148,7 +333,16 @@
}
public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) {
+ redistributeOntoIdleSupervisors(topologies, cluster);
for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
+ // needsSchedulingTopologies() returns the cluster's full topology set, but this run is scoped to the
+ // topologies passed in: EvenScheduler.schedule passes the full set (so the guard is a no-op), while
+ // DefaultScheduler.defaultSchedule calls us once per leftover topology with a single-topology Topologies.
+ // redistributeOntoIdleSupervisors above acted only on that passed-in set too. Skip topologies outside it so
+ // the leftover path never schedules one the caller excluded -- e.g. a down isolated topology on a reserved host.
+ if (topologies.getById(topology.getId()) == null) {
+ continue;
+ }
String topologyId = topology.getId();
Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster);
Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment);
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
index 1886273..f03e1de 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
@@ -46,6 +46,7 @@
* all the ports of the supervisor.
*/
private Set<Integer> allPorts;
+ private final long uptimeSecs;
/**
* Create the details of a new supervisor.
@@ -59,12 +60,22 @@
*/
public SupervisorDetails(String id, Integer serverPort, String host, Object meta, Object schedulerMeta,
Collection<? extends Number> allPorts, Map<String, Double> totalResources) {
+ // Callers that do not supply uptime (tests and every path other than the idle-supervisor rebalance) default to
+ // Long.MAX_VALUE, i.e. treated as indefinitely stable so the rebalance flap guard never holds them back. This is
+ // the deliberate opposite of Nimbus#supervisorUptimeSecs, which maps an unset uptime to 0L; production always
+ // supplies the real uptime through that path, so this default only affects non-feature callers.
+ this(id, serverPort, host, meta, schedulerMeta, allPorts, totalResources, Long.MAX_VALUE);
+ }
+
+ public SupervisorDetails(String id, Integer serverPort, String host, Object meta, Object schedulerMeta,
+ Collection<? extends Number> allPorts, Map<String, Double> totalResources, long uptimeSecs) {
this.id = id;
this.serverPort = serverPort;
this.host = host;
this.meta = meta;
this.schedulerMeta = schedulerMeta;
+ this.uptimeSecs = uptimeSecs;
if (allPorts != null) {
setAllPorts(allPorts);
} else {
@@ -82,6 +93,10 @@
this(id, null, null, meta, null, null, totalResources);
}
+ public SupervisorDetails(String id, Object meta, Map<String, Double> totalResources, long uptimeSecs) {
+ this(id, null, null, meta, null, null, totalResources, uptimeSecs);
+ }
+
public SupervisorDetails(String id, Object meta, Collection<? extends Number> allPorts) {
this(id, null, null, meta, null, allPorts, null);
}
@@ -95,11 +110,21 @@
this(id, null, host, null, schedulerMeta, allPorts, totalResources);
}
+ public SupervisorDetails(String id, String host, Object schedulerMeta,
+ Collection<? extends Number> allPorts, Map<String, Double> totalResources, long uptimeSecs) {
+ this(id, null, host, null, schedulerMeta, allPorts, totalResources, uptimeSecs);
+ }
+
public SupervisorDetails(String id, int serverPort, String host, Object schedulerMeta,
Collection<? extends Number> allPorts, Map<String, Double> totalResources) {
this(id, serverPort, host, null, schedulerMeta, allPorts, totalResources);
}
+ public SupervisorDetails(String id, int serverPort, String host, Object schedulerMeta,
+ Collection<? extends Number> allPorts, Map<String, Double> totalResources, long uptimeSecs) {
+ this(id, serverPort, host, null, schedulerMeta, allPorts, totalResources, uptimeSecs);
+ }
+
@Override
public String toString() {
return getClass().getSimpleName() + " ID: " + id + " HOST: " + host + " META: " + meta
@@ -126,6 +151,10 @@
return allPorts;
}
+ public long getUptimeSecs() {
+ return uptimeSecs;
+ }
+
private void setAllPorts(Collection<? extends Number> allPorts) {
this.allPorts = new HashSet<>();
if (allPorts != null) {
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java b/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java
new file mode 100644
index 0000000..c5849c6
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java
@@ -0,0 +1,691 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.blacklist.TestUtilsForBlacklistScheduler;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for the idle-supervisor rebalance behavior added to
+ * {@link EvenScheduler#redistributeOntoIdleSupervisors(Topologies, Cluster)} and its per-supervisor eligibility predicate
+ * {@link Cluster#isIdleSupervisorAvailableForEvenRebalance(SupervisorDetails)}.
+ *
+ * <p>Trigger condition is binary: at least one non-blacklisted supervisor with zero used slots must exist. The cluster
+ * being "almost balanced" never triggers the new logic, so a near-even distribution is preserved as-is. Each round only
+ * frees up to {@code nimbus.even.rebalance.max.free.per.topology} workers and never drains a supervisor down to zero.
+ * The tests assert on the observable effect of {@code redistributeOntoIdleSupervisors} (the resulting assignment) rather
+ * than on any internal boolean predicate.
+ */
+public class TestEvenSchedulerIdleSupervisor {
+
+ private static final String TOPO_ID = "topo-1";
+
+ /**
+ * supA and supB host the topology; supC is freshly returned and idle. Topology has 2 workers on supA and 1 on supB.
+ */
+ private Cluster buildClusterWithIdleSupervisor(boolean enableRebalance, int maxFreePerTopology) {
+ return buildClusterWithIdleSupervisor(TestUtilsForBlacklistScheduler.genSupervisors(3, 4),
+ evenRebalanceConf(enableRebalance, maxFreePerTopology));
+ }
+
+ private Cluster buildClusterWithIdleSupervisor(Map<String, SupervisorDetails> supMap, Map<String, Object> conf) {
+ // Build a topology and assign 3 workers: two on sup-0 and one on sup-1. sup-2 stays idle.
+ TopologyDetails topology = makeTopologyDetails(TOPO_ID, 3);
+
+ WorkerSlot s0p0 = new WorkerSlot("sup-0", 0);
+ WorkerSlot s0p1 = new WorkerSlot("sup-0", 1);
+ WorkerSlot s1p0 = new WorkerSlot("sup-1", 0);
+
+ List<ExecutorDetails> execs = new LinkedList<>(topology.getExecutors());
+ Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask()));
+ // Distribute the executors round-robin onto the three slots so each slot has at least one.
+ Map<ExecutorDetails, WorkerSlot> execToSlot = new HashMap<>();
+ WorkerSlot[] slotRing = new WorkerSlot[]{s0p0, s0p1, s1p0};
+ for (int i = 0; i < execs.size(); i++) {
+ execToSlot.put(execs.get(i), slotRing[i % slotRing.length]);
+ }
+ SchedulerAssignmentImpl assignment = new SchedulerAssignmentImpl(TOPO_ID, execToSlot, null, null);
+
+ Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+ assignments.put(TOPO_ID, assignment);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(TOPO_ID, topology);
+ Topologies topologies = new Topologies(topoMap);
+
+ return newCluster(supMap, assignments, topologies, conf);
+ }
+
+ private Map<String, Object> evenRebalanceConf(boolean enableRebalance, int maxFreePerTopology) {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, enableRebalance);
+ conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, maxFreePerTopology);
+ conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS, 0);
+ conf.put(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS, 3);
+ return conf;
+ }
+
+ private Map<String, SupervisorDetails> genSupervisorsWithUptime(int numSup, int numPorts, long uptimeSecs) {
+ Map<String, SupervisorDetails> supMap = new HashMap<>();
+ for (int i = 0; i < numSup; i++) {
+ SupervisorDetails sup = supervisor("sup-" + i, "host-" + i, numPorts, uptimeSecs);
+ supMap.put(sup.getId(), sup);
+ }
+ return supMap;
+ }
+
+ private SupervisorDetails supervisor(String id, String host, int numPorts, long uptimeSecs) {
+ List<Number> ports = new LinkedList<>();
+ for (int port = 0; port < numPorts; port++) {
+ ports.add(port);
+ }
+ return new SupervisorDetails(id, host, null, ports, null, uptimeSecs);
+ }
+
+ private Cluster newCluster(Map<String, SupervisorDetails> supMap,
+ Map<String, SchedulerAssignmentImpl> assignments,
+ Topologies topologies,
+ Map<String, Object> conf) {
+ StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+ ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
+ return new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, supMap,
+ assignments, topologies, conf);
+ }
+
+ private TopologyDetails makeTopologyDetails(String id, int numWorkers, int parallelism) {
+ Config conf = new Config();
+ conf.put(Config.TOPOLOGY_NAME, id);
+ conf.put(Config.TOPOLOGY_WORKERS, numWorkers);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("spout-0", new TestUtilsForBlacklistScheduler.TestSpout(), parallelism);
+ builder.setBolt("bolt-0", new TestUtilsForBlacklistScheduler.TestBolt(), parallelism).shuffleGrouping("spout-0");
+ StormTopology stormTopology = builder.createTopology();
+
+ Map<ExecutorDetails, String> execsAndComps = TestUtilsForBlacklistScheduler.genExecsAndComps(
+ stormTopology, parallelism, parallelism);
+ return new TopologyDetails(id, conf, stormTopology, numWorkers, execsAndComps, 0, "user");
+ }
+
+ private TopologyDetails makeTopologyDetails(String id, int numWorkers) {
+ return makeTopologyDetails(id, numWorkers, 3);
+ }
+
+ private TopologyDetails firstTopology(Cluster cluster) {
+ return cluster.getTopologies().getById(TOPO_ID);
+ }
+
+ private int usedSlotCount(Cluster cluster, String supervisorId) {
+ SupervisorDetails s = cluster.getSupervisorById(supervisorId);
+ return cluster.getUsedPorts(s).size();
+ }
+
+ @Test
+ public void disabledByDefault_doesNotTrigger() {
+ Cluster cluster = buildClusterWithIdleSupervisor(false, 1);
+
+ EvenScheduler.redistributeOntoIdleSupervisors(cluster.getTopologies(), cluster);
+
+ // Disabled flag must short-circuit the rebalance even when an idle supervisor exists: nothing moves onto sup-2.
+ assertEquals(2, usedSlotCount(cluster, "sup-0"));
+ assertEquals(1, usedSlotCount(cluster, "sup-1"));
+ assertEquals(0, usedSlotCount(cluster, "sup-2"));
+ assertFalse(cluster.needsScheduling(firstTopology(cluster)),
+ "needsScheduling must remain false when the new behavior is disabled and the topology is fully assigned");
+ }
+
+ @Test
+ public void enabledWithIdleSupervisor_doesNotChangeGenericNeedsScheduling() {
+ Cluster cluster = buildClusterWithIdleSupervisor(true, 1);
+
+ // Enabling the opt-in idle rebalance must not leak into the generic scheduling triggers other schedulers use.
+ assertFalse(cluster.needsScheduling(firstTopology(cluster)),
+ "needsScheduling is used by schedulers other than EvenScheduler; the idle trigger stays out of that generic path");
+ assertFalse(cluster.needsSchedulingRas(firstTopology(cluster)),
+ "ResourceAwareScheduler keeps using needsSchedulingRas, so this opt-in EvenScheduler feature is out of RAS scope");
+
+ EvenScheduler.redistributeOntoIdleSupervisors(cluster.getTopologies(), cluster);
+
+ // The feature itself still fires: one worker relocates onto the idle supervisor (the observable trigger)...
+ assertEquals(1, usedSlotCount(cluster, "sup-2"));
+ assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)));
+ // ...while the generic triggers stay false afterward -- the relocation kept the topology fully assigned.
+ assertFalse(cluster.needsScheduling(firstTopology(cluster)));
+ assertFalse(cluster.needsSchedulingRas(firstTopology(cluster)));
+ }
+
+ @Test
+ public void noIdleSupervisor_doesNotTrigger() {
+ // Two supervisors, both serving the topology -> no idle supervisor present.
+ Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(2, 4);
+ TopologyDetails topology = makeTopologyDetails(TOPO_ID, 3);
+
+ Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+ assignments.put(TOPO_ID, buildAssignment(topology, new WorkerSlot[]{
+ new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), new WorkerSlot("sup-1", 0),
+ }));
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(TOPO_ID, topology);
+ Topologies topologies = new Topologies(topoMap);
+
+ Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 1));
+
+ EvenScheduler.redistributeOntoIdleSupervisors(cluster.getTopologies(), cluster);
+
+ // No supervisor has zero used slots, so the binary trigger never fires: the assignment is left untouched.
+ assertEquals(2, usedSlotCount(cluster, "sup-0"));
+ assertEquals(1, usedSlotCount(cluster, "sup-1"));
+ assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)));
+ assertFalse(cluster.needsScheduling(firstTopology(cluster)));
+ }
+
+ @Test
+ public void redistributeRelocatesAtMostMaxFreeWorkersPerTopology() {
+ Cluster cluster = buildClusterWithIdleSupervisor(true, 1);
+ assertEquals(2, usedSlotCount(cluster, "sup-0"));
+ assertEquals(1, usedSlotCount(cluster, "sup-1"));
+ assertEquals(0, usedSlotCount(cluster, "sup-2"));
+
+ EvenScheduler.redistributeOntoIdleSupervisors(cluster.getTopologies(), cluster);
+
+ // max-free=1 caps the topology to a single relocation; pulled from the most-loaded supervisor (sup-0)
+ // and placed directly onto the idle supervisor.
+ assertEquals(1, usedSlotCount(cluster, "sup-0"));
+ assertEquals(1, usedSlotCount(cluster, "sup-1"));
+ assertEquals(1, usedSlotCount(cluster, "sup-2"));
+ assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)));
+ }
+
+ @Test
+ public void redistributeNeverDrainsSupervisorToZero() {
+ Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+ TopologyDetails topology = makeTopologyDetails(TOPO_ID, 2);
+
+ Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+ assignments.put(TOPO_ID, buildAssignment(topology, new WorkerSlot[]{
+ new WorkerSlot("sup-0", 0), new WorkerSlot("sup-1", 0),
+ }));
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(TOPO_ID, topology);
+ Topologies topologies = new Topologies(topoMap);
+
+ Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 5));
+
+ EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
+
+ // floor(2/3)=0 → topology gets a budget of 0 and is skipped entirely. No source supervisor is drained.
+ assertEquals(1, usedSlotCount(cluster, "sup-0"));
+ assertEquals(1, usedSlotCount(cluster, "sup-1"));
+ assertEquals(0, usedSlotCount(cluster, "sup-2"));
+ }
+
+ @Test
+ public void scheduleTopologiesEvenly_movesOneWorkerToIdleSupervisor() {
+ Cluster cluster = buildClusterWithIdleSupervisor(true, 1);
+
+ Topologies topologies = cluster.getTopologies();
+ EvenScheduler.scheduleTopologiesEvenly(topologies, cluster);
+
+ // After scheduling: idle supervisor (sup-2) should now host exactly 1 worker.
+ assertEquals(1, usedSlotCount(cluster, "sup-2"));
+ // Total worker count is preserved (3) and respects the topology's numWorkers.
+ int total = usedSlotCount(cluster, "sup-0")
+ + usedSlotCount(cluster, "sup-1")
+ + usedSlotCount(cluster, "sup-2");
+ assertEquals(3, total);
+ assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)),
+ "relocation must preserve the topology's declared worker count, not just keep 3 slots occupied");
+ }
+
+ /**
+ * Single-worker topology + idle supervisors must produce no movement: {@code floor(1 / N) = 0} for any N >= 2, so the
+ * drain budget evaluates to zero regardless of how many idle supervisors exist. Without this guard a single-worker
+ * topology would ping-pong between supervisors every monitor cycle.
+ */
+ @Test
+ public void singleWorkerTopology_doesNotMoveDespiteIdleSupervisors() {
+ String topoId = "topo-single";
+ Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+ TopologyDetails topology = makeTopologyDetails(topoId, 1, 1);
+
+ Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+ assignments.put(topoId, buildAssignment(topology, new WorkerSlot[]{ new WorkerSlot("sup-0", 0) }));
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topoId, topology);
+ Topologies topologies = new Topologies(topoMap);
+
+ Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 0));
+
+ EvenScheduler.scheduleTopologiesEvenly(topologies, cluster);
+
+ assertEquals(1, usedSlotCount(cluster, "sup-0"));
+ assertEquals(0, usedSlotCount(cluster, "sup-1"));
+ assertEquals(0, usedSlotCount(cluster, "sup-2"));
+ }
+
+ /**
+ * 8-worker topology starts at distribution (4, 4, 0). With max-free unbounded the budget targets
+ * floor(numWorkers / numSupervisors) = 2 workers for the idle supervisor, and the round ends at (3, 3, 2)
+ * — fully even — without disturbing topologies on the next round (no supervisor is idle anymore).
+ */
+ @Test
+ public void evenDistributionInOneRound_unboundedMaxFree() {
+ String topoId = "topo-even";
+ Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+ TopologyDetails topology = makeTopologyDetails(topoId, 8, 4);
+
+ Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+ assignments.put(topoId, buildAssignment(topology, new WorkerSlot[]{
+ new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
+ new WorkerSlot("sup-0", 2), new WorkerSlot("sup-0", 3),
+ new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1),
+ new WorkerSlot("sup-1", 2), new WorkerSlot("sup-1", 3),
+ }));
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topoId, topology);
+ Topologies topologies = new Topologies(topoMap);
+
+ Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 0));
+
+ assertEquals(4, usedSlotCount(cluster, "sup-0"));
+ assertEquals(4, usedSlotCount(cluster, "sup-1"));
+ assertEquals(0, usedSlotCount(cluster, "sup-2"));
+
+ EvenScheduler.scheduleTopologiesEvenly(topologies, cluster);
+
+ // Idle supervisor absorbs exactly floor(8/3) = 2 workers in one round; total worker count is preserved.
+ assertEquals(2, usedSlotCount(cluster, "sup-2"));
+ assertEquals(8, usedSlotCount(cluster, "sup-0")
+ + usedSlotCount(cluster, "sup-1")
+ + usedSlotCount(cluster, "sup-2"));
+ assertEquals(8, cluster.getAssignedNumWorkers(cluster.getTopologies().getById(topoId)),
+ "relocation must preserve the declared worker count of an 8-worker topology");
+ // No supervisor is idle anymore, so a second pass relocates nothing -- the trigger will not refire next round.
+ EvenScheduler.redistributeOntoIdleSupervisors(cluster.getTopologies(), cluster);
+ assertEquals(2, usedSlotCount(cluster, "sup-2"));
+ assertEquals(8, usedSlotCount(cluster, "sup-0")
+ + usedSlotCount(cluster, "sup-1")
+ + usedSlotCount(cluster, "sup-2"));
+ }
+
+ /**
+ * Two equally-sized topologies share the same returning supervisor round-robin: each contributes one worker, so
+ * sup-2 ends up hosting workers from both — restoring per-supervisor workload diversity the way a fresh submission
+ * would.
+ */
+ @Test
+ public void multipleTopologies_shareIdleSlotsRoundRobin() {
+ Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+
+ TopologyDetails topoA = makeTopologyDetails("topo-A", 4, 2);
+ TopologyDetails topoB = makeTopologyDetails("topo-B", 4, 2);
+
+ Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+ assignments.put("topo-A", buildAssignment(topoA, new WorkerSlot[]{
+ new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
+ new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1),
+ }));
+ assignments.put("topo-B", buildAssignment(topoB, new WorkerSlot[]{
+ new WorkerSlot("sup-0", 2), new WorkerSlot("sup-0", 3),
+ new WorkerSlot("sup-1", 2), new WorkerSlot("sup-1", 3),
+ }));
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put("topo-A", topoA);
+ topoMap.put("topo-B", topoB);
+ Topologies topologies = new Topologies(topoMap);
+
+ Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 0));
+
+ EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
+
+ // floor(4/3)=1 per topology, two topologies → sup-2 hosts 1 worker from each, in round-robin order.
+ // Exact counts (not >= 1) are what actually enforce round-robin fairness: a broken inner loop that let the
+ // first topology grab both idle slots would leave topo-B at 0 here.
+ assertEquals(2, usedSlotCount(cluster, "sup-2"));
+ assertEquals(1, supervisorWorkerCount(cluster, "topo-A", "sup-2"));
+ assertEquals(1, supervisorWorkerCount(cluster, "topo-B", "sup-2"));
+ // Each topology kept its total worker count; only one host moved.
+ assertEquals(4, cluster.getAssignedNumWorkers(topoA));
+ assertEquals(4, cluster.getAssignedNumWorkers(topoB));
+ // Donor supervisors are never drained to zero (which would make them the next round's idle target).
+ assertTrue(usedSlotCount(cluster, "sup-0") > 0);
+ assertTrue(usedSlotCount(cluster, "sup-1") > 0);
+ }
+
+ /**
+ * Flap-guard boundary: with 3 stable rounds at a 3s monitor frequency a returning supervisor must have been up for
+ * at least 9 seconds before it is eligible. {@code uptime == requiredUptime} is the first value that moves, making
+ * the off-by-one contract explicit: {@code uptimeSecs >= minStableRounds * monitorFrequencySecs}.
+ */
+ @ParameterizedTest
+ @CsvSource({
+ "8, false", // threshold - 1: too young, stays idle
+ "9, true", // exactly at threshold: eligible
+ "10, true", // threshold + 1: eligible
+ })
+ public void flapGuardHonorsMinStableRoundBoundary(long sup2UptimeSecs, boolean expectMove) {
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithUptime(3, 4, 100);
+ supMap.put("sup-2", supervisor("sup-2", "host-2", 4, sup2UptimeSecs));
+
+ Map<String, Object> conf = evenRebalanceConf(true, 1);
+ conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS, 3);
+ conf.put(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS, 3);
+ Cluster cluster = buildClusterWithIdleSupervisor(supMap, conf);
+
+ EvenScheduler.scheduleTopologiesEvenly(cluster.getTopologies(), cluster);
+
+ // 3 stable rounds at a 3 second monitor frequency require at least 9 seconds of supervisor uptime before sup-2
+ // becomes an eligible target; the placement below is the observable expression of that boundary.
+ if (expectMove) {
+ assertEquals(1, usedSlotCount(cluster, "sup-2"));
+ assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)));
+ } else {
+ assertEquals(2, usedSlotCount(cluster, "sup-0"));
+ assertEquals(1, usedSlotCount(cluster, "sup-1"));
+ assertEquals(0, usedSlotCount(cluster, "sup-2"));
+ }
+ }
+
+ @Test
+ public void donorTieBreaksBySupervisorIdWhenWorkerCountsTie() {
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithUptime(3, 4, 100);
+ TopologyDetails topology = makeTopologyDetails("topo-tie", 4, 4);
+
+ Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+ assignments.put("topo-tie", buildAssignment(topology, new WorkerSlot[]{
+ new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
+ new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1),
+ }));
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put("topo-tie", topology);
+ Topologies topologies = new Topologies(topoMap);
+ Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 1));
+
+ EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
+
+ assertEquals(1, supervisorWorkerCount(cluster, "topo-tie", "sup-0"),
+ "sup-0 and sup-1 started with two workers each; lexicographic tie-break chooses sup-0 as donor");
+ assertEquals(2, supervisorWorkerCount(cluster, "topo-tie", "sup-1"));
+ assertEquals(1, supervisorWorkerCount(cluster, "topo-tie", "sup-2"));
+ assertEquals(4, cluster.getAssignedNumWorkers(topology),
+ "the tie-break relocation preserves the topology's declared worker count");
+ }
+
+ /**
+ * Two simultaneously-idle supervisors must let a single topology relocate
+ * {@code floor(numWorkers / nonBlacklistedSupervisorCount) * idleSupervisorCount} workers in one round, exercising the
+ * {@code * idleSupervisorCount} term of the budget formula. With 4 non-blacklisted supervisors (two busy, two idle)
+ * and an 8-worker topology the budget is {@code floor(8 / 4) * 2 = 4}; a regression that dropped the multiplier would
+ * compute 2 and relocate only half as many workers. Every other test has exactly one usable idle supervisor, so this
+ * fixture is the one that pins the multiplier down.
+ */
+ @Test
+ public void twoIdleSupervisors_budgetScalesWithIdleSupervisorCount() {
+ String topoId = "topo-two-idle";
+ Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(4, 4);
+ TopologyDetails topology = makeTopologyDetails(topoId, 8, 4);
+
+ Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+ assignments.put(topoId, buildAssignment(topology, new WorkerSlot[]{
+ new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
+ new WorkerSlot("sup-0", 2), new WorkerSlot("sup-0", 3),
+ new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1),
+ new WorkerSlot("sup-1", 2), new WorkerSlot("sup-1", 3),
+ }));
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topoId, topology);
+ Topologies topologies = new Topologies(topoMap);
+
+ Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 0));
+
+ // sup-2 and sup-3 both start idle.
+ assertEquals(0, usedSlotCount(cluster, "sup-2"));
+ assertEquals(0, usedSlotCount(cluster, "sup-3"));
+
+ EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
+
+ // budget = floor(8 / 4 non-blacklisted) * 2 idle = 4 relocations onto the idle supervisors. Asserting the sum
+ // (not a single supervisor) keeps this independent of the idle-slot fill order. Dropping the * idleSupervisorCount
+ // term would compute budget 2 and move only 2 workers, failing this assertion.
+ assertEquals(4, usedSlotCount(cluster, "sup-2") + usedSlotCount(cluster, "sup-3"),
+ "budget must scale with the number of simultaneously-idle supervisors: floor(8/4) * 2 = 4");
+ assertEquals(4, usedSlotCount(cluster, "sup-0") + usedSlotCount(cluster, "sup-1"));
+ assertEquals(8, cluster.getAssignedNumWorkers(cluster.getTopologies().getById(topoId)),
+ "relocation preserves the declared worker count");
+ }
+
+ /**
+ * {@code max.free.per.topology} must be able to bind more tightly than the even-distribution budget. With 3
+ * supervisors and a 6-worker topology the even budget is {@code floor(6 / 3) * 1 = 2}, but {@code maxFree = 1} clamps
+ * it to a single relocation. This is the only fixture where the {@code Math.min(target, maxFree)} clamp is the
+ * strictly binding constraint -- removing the clamp would relocate 2 workers and push sup-2 to 2, failing the
+ * assertion below.
+ */
+ @Test
+ public void maxFreePerTopologyClampsBelowEvenBudget() {
+ String topoId = "topo-clamp";
+ Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+ TopologyDetails topology = makeTopologyDetails(topoId, 6, 3);
+
+ Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+ assignments.put(topoId, buildAssignment(topology, new WorkerSlot[]{
+ new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), new WorkerSlot("sup-0", 2),
+ new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), new WorkerSlot("sup-1", 2),
+ }));
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(topoId, topology);
+ Topologies topologies = new Topologies(topoMap);
+
+ Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 1));
+
+ EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
+
+ // even budget = floor(6/3)*1 = 2, but maxFree=1 clamps it to a single relocation. Without Math.min(target,
+ // maxFree) two workers would move and sup-2 would hold 2.
+ assertEquals(1, usedSlotCount(cluster, "sup-2"),
+ "max.free.per.topology must clamp the even-distribution budget of 2 down to 1");
+ assertEquals(6, cluster.getAssignedNumWorkers(cluster.getTopologies().getById(topoId)));
+ assertTrue(usedSlotCount(cluster, "sup-0") > 0);
+ assertTrue(usedSlotCount(cluster, "sup-1") > 0);
+ }
+
+ @Test
+ public void blacklistedIdleSupervisorIsNotReusableTarget() {
+ Cluster cluster = buildClusterWithIdleSupervisor(true, 1);
+ cluster.blacklistHost("host-2");
+
+ assertFalse(cluster.isIdleSupervisorAvailableForEvenRebalance(cluster.getSupervisorById("sup-2")),
+ "IsolationScheduler represents reserved hosts by blacklisting them before delegating to DefaultScheduler");
+
+ EvenScheduler.scheduleTopologiesEvenly(cluster.getTopologies(), cluster);
+
+ // The blacklisted idle supervisor is never a target, so the assignment is left untouched.
+ assertEquals(2, usedSlotCount(cluster, "sup-0"));
+ assertEquals(1, usedSlotCount(cluster, "sup-1"));
+ assertEquals(0, usedSlotCount(cluster, "sup-2"));
+ assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)));
+ }
+
+ @Test
+ public void defaultSchedulerIdleRebalanceHonorsLeftoverTopologySubset() {
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithUptime(3, 4, 100);
+ TopologyDetails isolated = makeTopologyDetails("topo-isolated", 2, 2);
+ TopologyDetails regular = makeTopologyDetails("topo-regular", 3, 3);
+
+ Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+ assignments.put(isolated.getId(), buildAssignment(isolated, new WorkerSlot[]{
+ new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
+ }));
+ assignments.put(regular.getId(), buildAssignment(regular, new WorkerSlot[]{
+ new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), new WorkerSlot("sup-1", 2),
+ }));
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(isolated.getId(), isolated);
+ topoMap.put(regular.getId(), regular);
+ Topologies allTopologies = new Topologies(topoMap);
+ Cluster cluster = newCluster(supMap, assignments, allTopologies, evenRebalanceConf(true, 0));
+
+ cluster.blacklistHost("host-0");
+ DefaultScheduler.defaultSchedule(new Topologies(regular), cluster);
+
+ assertEquals(2, supervisorWorkerCount(cluster, isolated.getId(), "sup-0"),
+ "the isolated topology is not in the leftover topology set, so DefaultScheduler must not move it");
+ assertEquals(0, supervisorWorkerCount(cluster, isolated.getId(), "sup-2"));
+ assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1"));
+ assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-2"));
+ // Both topologies keep their declared worker counts: the leftover one is relocated, the excluded one untouched.
+ assertEquals(3, cluster.getAssignedNumWorkers(regular));
+ assertEquals(2, cluster.getAssignedNumWorkers(isolated));
+ }
+
+ @Test
+ public void isolationSchedulerOnlyRelocatesLeftoverTopologyOntoNonIsolatedIdleSupervisor() {
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithUptime(3, 4, 100);
+ TopologyDetails isolated = makeTopologyDetails("topo-isolated", 1, 1);
+ TopologyDetails regular = makeTopologyDetails("topo-regular", 3, 3);
+
+ Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+ assignments.put(isolated.getId(), buildAssignment(isolated, new WorkerSlot[]{
+ new WorkerSlot("sup-0", 0),
+ }));
+ assignments.put(regular.getId(), buildAssignment(regular, new WorkerSlot[]{
+ new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), new WorkerSlot("sup-1", 2),
+ }));
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(isolated.getId(), isolated);
+ topoMap.put(regular.getId(), regular);
+ Topologies topologies = new Topologies(topoMap);
+
+ Map<String, Object> conf = evenRebalanceConf(true, 0);
+ conf.put(DaemonConfig.ISOLATION_SCHEDULER_MACHINES, Collections.singletonMap(isolated.getName(), 1));
+ Cluster cluster = newCluster(supMap, assignments, topologies, conf);
+
+ IsolationScheduler scheduler = new IsolationScheduler();
+ scheduler.prepare(conf, new StormMetricsRegistry());
+ scheduler.schedule(topologies, cluster);
+
+ assertEquals(1, supervisorWorkerCount(cluster, isolated.getId(), "sup-0"),
+ "the already-isolated topology remains on its isolated host and is not selected as a donor");
+ assertEquals(0, supervisorWorkerCount(cluster, isolated.getId(), "sup-2"));
+ assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1"));
+ assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-2"),
+ "only the leftover regular topology is allowed to move onto the non-isolated idle supervisor");
+ // The relocated leftover topology and the untouched isolated topology both keep their declared worker counts.
+ assertEquals(3, cluster.getAssignedNumWorkers(regular));
+ assertEquals(1, cluster.getAssignedNumWorkers(isolated));
+ }
+
+ /**
+ * IsolationScheduler reserves a host by blacklisting it before delegating the remaining (non-isolated) topologies to
+ * {@link DefaultScheduler#defaultSchedule(Topologies, Cluster)}. Here the isolated topology is down -- it has no
+ * assigned workers at all -- yet its reserved host (sup-2) must not be treated as an idle relocation target even
+ * though it has zero used slots. The leftover regular topology is rebalanced onto the genuinely idle, non-reserved
+ * sup-3 and never onto the reserved sup-2.
+ */
+ @Test
+ public void reservedHostForDownIsolatedTopologyIsNotTreatedAsIdle() {
+ Map<String, SupervisorDetails> supMap = genSupervisorsWithUptime(4, 4, 100);
+ TopologyDetails isolated = makeTopologyDetails("topo-isolated", 1, 1);
+ TopologyDetails regular = makeTopologyDetails("topo-regular", 4, 4);
+
+ Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+ // The isolated topology is down: it has no assignment at all.
+ assignments.put(regular.getId(), buildAssignment(regular, new WorkerSlot[]{
+ new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
+ new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1),
+ }));
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ topoMap.put(isolated.getId(), isolated);
+ topoMap.put(regular.getId(), regular);
+ Topologies allTopologies = new Topologies(topoMap);
+ Cluster cluster = newCluster(supMap, assignments, allTopologies, evenRebalanceConf(true, 0));
+
+ // sup-2 is reserved for the (down) isolated topology -- IsolationScheduler represents this by blacklisting it.
+ cluster.blacklistHost("host-2");
+
+ assertFalse(cluster.isIdleSupervisorAvailableForEvenRebalance(cluster.getSupervisorById("sup-2")),
+ "a blacklisted reserved host is never an even-rebalance target, even with zero used slots");
+ assertTrue(cluster.isIdleSupervisorAvailableForEvenRebalance(cluster.getSupervisorById("sup-3")),
+ "the non-reserved idle supervisor is available");
+
+ // IsolationScheduler delegates the leftover (non-isolated) topologies to DefaultScheduler with the reserved
+ // host already blacklisted.
+ DefaultScheduler.defaultSchedule(new Topologies(regular), cluster);
+
+ assertEquals(0, usedSlotCount(cluster, "sup-2"),
+ "the reserved host stays idle: the down isolated topology's machine is not repopulated by rebalance");
+ assertEquals(0, supervisorWorkerCount(cluster, regular.getId(), "sup-2"));
+ assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-3"),
+ "the leftover regular topology rebalances onto the genuinely idle, non-reserved supervisor");
+ assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-0"));
+ assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1"));
+ assertEquals(4, cluster.getAssignedNumWorkers(regular),
+ "the leftover topology keeps all 4 workers; the move never loses executors");
+ assertEquals(0, cluster.getUsedSlotsByTopologyId(isolated.getId()).size(),
+ "the isolated topology is down and is never scheduled by the leftover path");
+ }
+
+ private SchedulerAssignmentImpl buildAssignment(TopologyDetails topology, WorkerSlot[] slots) {
+ List<ExecutorDetails> execs = new LinkedList<>(topology.getExecutors());
+ Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask()));
+ Map<ExecutorDetails, WorkerSlot> map = new HashMap<>();
+ for (int i = 0; i < execs.size(); i++) {
+ map.put(execs.get(i), slots[i % slots.length]);
+ }
+ return new SchedulerAssignmentImpl(topology.getId(), map, null, null);
+ }
+
+ private int supervisorWorkerCount(Cluster cluster, String topologyId, String supervisorId) {
+ int count = 0;
+ for (WorkerSlot slot : cluster.getUsedSlotsByTopologyId(topologyId)) {
+ if (slot.getNodeId().equals(supervisorId)) {
+ count++;
+ }
+ }
+ return count;
+ }
+}