Merge pull request #8783 from GGraziadei/8710-flux-component-conf

Allow per-component configuration in Flux topology
diff --git a/.github/workflows/cypress-tests.yml b/.github/workflows/cypress-tests.yml
index 5f39e7e..26621b3 100644
--- a/.github/workflows/cypress-tests.yml
+++ b/.github/workflows/cypress-tests.yml
@@ -33,7 +33,7 @@
       run:
         working-directory: storm-webapp
     steps:
-      - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+      - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3
 
       - name: Set up Node.js
         uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0
diff --git a/.github/workflows/maven.yaml b/.github/workflows/maven.yaml
index 98666df..db1753c 100644
--- a/.github/workflows/maven.yaml
+++ b/.github/workflows/maven.yaml
@@ -27,7 +27,7 @@
     runs-on: ubuntu-latest
     timeout-minutes: 30
     steps:
-      - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+      - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3
       - uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
         with:
           path: ~/.m2/repository
@@ -71,7 +71,7 @@
         experimental: [false]
       fail-fast: false
     steps:
-      - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+      - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3
       - uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
         with:
           path: ~/.m2/repository
diff --git a/.github/workflows/nightlies.yaml b/.github/workflows/nightlies.yaml
index f4ffcda..505c702 100644
--- a/.github/workflows/nightlies.yaml
+++ b/.github/workflows/nightlies.yaml
@@ -29,7 +29,7 @@
     name: Upload to Nightly Builds
     runs-on: ubuntu-latest
     steps:
-      - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+      - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3
       - uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
         with:
           path: ~/.m2/repository
diff --git a/.github/workflows/snapshots.yaml b/.github/workflows/snapshots.yaml
index 37ebf40..73f514f 100644
--- a/.github/workflows/snapshots.yaml
+++ b/.github/workflows/snapshots.yaml
@@ -29,7 +29,7 @@
     name: Publish Snapshots
     runs-on: ubuntu-latest
     steps:
-      - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+      - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3
       - uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
         with:
           path: ~/.m2/repository
diff --git a/.github/workflows/update-license-files.yml b/.github/workflows/update-license-files.yml
index 24a11b1..555bc61 100644
--- a/.github/workflows/update-license-files.yml
+++ b/.github/workflows/update-license-files.yml
@@ -28,7 +28,7 @@
   update-license-files:
     runs-on: ubuntu-latest
     steps:
-      - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+      - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3
 
       - uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
         with:
diff --git a/AGENTS.md b/AGENTS.md
new file mode 100644
index 0000000..c6a5b2f
--- /dev/null
+++ b/AGENTS.md
@@ -0,0 +1,3 @@
+## Security
+
+See [Security Model](https://storm.apache.org/security-model.html) - the linked security model for the project's threat model, in-scope / out-of-scope declarations, and known non-findings before reporting issues.
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index d5d6bb4..6f55bae 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -94,6 +94,9 @@
 nimbus.credential.renewers.freq.secs: 600
 nimbus.queue.size: 100000
 scheduler.display.resource: false
+nimbus.even.rebalance.idle.supervisor.enabled: false
+nimbus.even.rebalance.max.free.per.topology: 0
+nimbus.even.rebalance.idle.supervisor.min.stable.rounds: 3
 nimbus.local.assignments.backend.class: "org.apache.storm.assignments.InMemoryAssignmentBackend"
 nimbus.assignments.service.threads: 10
 nimbus.assignments.service.thread.queue.size: 100
diff --git a/pom.xml b/pom.xml
index 5926042..0724796 100644
--- a/pom.xml
+++ b/pom.xml
@@ -227,6 +227,7 @@
                                 <!-- exclude CHANGELOG, VERSION, AND TODO files -->
                                 <exclude>**/CHANGELOG.md</exclude>
                                 <exclude>**/README.md</exclude>
+                                <exclude>**/AGENTS.md</exclude>
                                 <exclude>**/README.markdown</exclude>
                                 <exclude>**/DEVELOPER.md</exclude>
                                 <exclude>**/RELEASING.md</exclude>
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 15924b1..6dcc2eb 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -176,6 +176,37 @@
     public static final String SCHEDULER_DISPLAY_RESOURCE = "scheduler.display.resource";
 
     /**
+     * If true, {@link org.apache.storm.scheduler.EvenScheduler} may move already-assigned workers onto non-blacklisted supervisors
+     * with no slot in use. This lets a freshly returned supervisor pick up workers instead of staying idle. The number of workers
+     * freed per topology in a single scheduling round is capped by {@link #NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY}, so even
+     * distribution is approached gradually rather than rebuilt from scratch.
+     */
+    @IsBoolean
+    public static final String NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED
+            = "nimbus.even.rebalance.idle.supervisor.enabled";
+
+    /**
+     * Optional upper bound on the number of currently-assigned workers a single topology may release in one scheduling round
+     * when the idle-supervisor rebalance defined by {@link #NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED} kicks in. The
+     * default budget already targets an even per-supervisor distribution (idle supervisors absorb roughly {@code numWorkers /
+     * numSupervisors} workers each in one round), capped by the idle side's free slot capacity. Setting this to a positive
+     * value tightens that budget; setting it to {@code 0} or a negative value leaves the even-distribution budget unbounded.
+     */
+    @IsInteger
+    public static final String NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY
+            = "nimbus.even.rebalance.max.free.per.topology";
+
+    /**
+     * Minimum number of consecutive supervisor monitor rounds that a fully-idle supervisor must have been alive before
+     * {@link org.apache.storm.scheduler.EvenScheduler} can relocate workers onto it. A positive value avoids moving workers onto a
+     * supervisor that has only just returned and may still be flapping. Setting this to {@code 0} or a negative value disables the
+     * uptime guard.
+     */
+    @IsInteger
+    public static final String NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS
+            = "nimbus.even.rebalance.idle.supervisor.min.stable.rounds";
+
+    /**
      * The directory where storm's health scripts go.
      */
     @IsString
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 103eb00..0463c78 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -985,11 +985,20 @@
             String id = entry.getKey();
             SupervisorInfo info = entry.getValue();
             ret.put(id, new SupervisorDetails(id, info.get_server_port(), info.get_hostname(),
-                                              info.get_scheduler_meta(), null, info.get_resources_map()));
+                                              info.get_scheduler_meta(), null, info.get_resources_map(),
+                                              supervisorUptimeSecs(info)));
         }
         return ret;
     }
 
+    private static long supervisorUptimeSecs(SupervisorInfo info) {
+        // An unset uptime maps to 0L (not the Long.MAX_VALUE default the bare SupervisorDetails constructors use) so a
+        // freshly (re)registered supervisor is treated as just-returned and must accrue real uptime before
+        // Cluster#hasMinimumIdleSupervisorStability lets the idle rebalance place workers on it -- the conservative
+        // choice on the production path.
+        return info.is_set_uptime_secs() ? info.get_uptime_secs() : 0L;
+    }
+
     /**
      * NOTE: this can return false when a topology has just been activated.  The topology may still be
      * in the STORMS_SUBTREE.
@@ -2273,7 +2282,8 @@
         List<SupervisorDetails> superDetails = new ArrayList<>();
         for (Entry<String, SupervisorInfo> entry : superInfos.entrySet()) {
             SupervisorInfo info = entry.getValue();
-            superDetails.add(new SupervisorDetails(entry.getKey(), info.get_meta(), info.get_resources_map()));
+            superDetails.add(new SupervisorDetails(entry.getKey(), info.get_meta(), info.get_resources_map(),
+                                                   supervisorUptimeSecs(info)));
         }
         // Note that allSlotsAvailableForScheduling
         // only uses the supervisor-details. The rest of the arguments
@@ -2306,7 +2316,7 @@
                 allPorts.removeAll(deadPorts);
             }
             ret.put(superId, new SupervisorDetails(superId, hostname, info.get_scheduler_meta(),
-                                                   allPorts, info.get_resources_map()));
+                                                   allPorts, info.get_resources_map(), supervisorUptimeSecs(info)));
         }
         return ret;
     }
@@ -5526,4 +5536,3 @@
         }
     }
 }
-
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 1a8879d..c01b6fb 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -363,6 +363,42 @@
         return desiredNumWorkers > assignedNumWorkers || getUnassignedExecutors(topology).size() > 0;
     }
 
+    /**
+     * Returns true when {@code supervisor} is a stable, non-blacklisted supervisor whose slots are all currently free --
+     * i.e. a returning idle supervisor the {@link EvenScheduler} idle-rebalance pass may relocate workers onto. The check
+     * is binary by design -- a supervisor either has zero used slots or it does not -- so the rebalance never fires for an
+     * "almost balanced" cluster. Stability is gated by {@link #hasMinimumIdleSupervisorStability(SupervisorDetails)} so a
+     * supervisor that has only just returned (and may still be flapping) is held back until it has been up long enough. The
+     * opt-in {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED} flag is checked once by the caller
+     * ({@link EvenScheduler#redistributeOntoIdleSupervisors(Topologies, Cluster)}), not here.
+     */
+    public boolean isIdleSupervisorAvailableForEvenRebalance(SupervisorDetails supervisor) {
+        if (supervisor == null) {
+            return false;
+        }
+        if (isBlackListed(supervisor.getId())) {
+            return false;
+        }
+        if (supervisor.getAllPorts().isEmpty()) {
+            return false;
+        }
+        if (!getUsedPorts(supervisor).isEmpty()) {
+            return false;
+        }
+        return hasMinimumIdleSupervisorStability(supervisor);
+    }
+
+    private boolean hasMinimumIdleSupervisorStability(SupervisorDetails supervisor) {
+        int minStableRounds = ObjectReader.getInt(
+                conf.get(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS), 3);
+        if (minStableRounds <= 0) {
+            return true;
+        }
+        int monitorFrequencySecs = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS), 3);
+        long requiredUptimeSecs = (long) minStableRounds * Math.max(1, monitorFrequencySecs);
+        return supervisor.getUptimeSecs() >= requiredUptimeSecs;
+    }
+
     @Override
     public boolean needsSchedulingRas(TopologyDetails topology) {
         return getUnassignedExecutors(topology).size() > 0;
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
index 81a0ad8..7f6cc9c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
@@ -72,7 +72,16 @@
     }
 
     public static void defaultSchedule(Topologies topologies, Cluster cluster) {
+        EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
         for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
+            // needsSchedulingTopologies() returns the cluster's full topology set, but this run is scoped to the
+            // topologies passed in: DefaultScheduler.schedule passes the full set (so the guard is a no-op), while
+            // IsolationScheduler delegates only its leftover, non-isolated topologies here. redistributeOntoIdleSupervisors
+            // above acted only on that passed-in set too. Skip topologies outside it so the leftover path never schedules
+            // one the caller excluded -- e.g. a down isolated topology on a reserved host.
+            if (topologies.getById(topology.getId()) == null) {
+                continue;
+            }
             List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
             Set<ExecutorDetails> allExecutors = topology.getExecutors();
 
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
index ccc2e34..69d62d6 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
@@ -18,18 +18,23 @@
 
 package org.apache.storm.scheduler;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import org.apache.storm.DaemonConfig;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.collect.Sets;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
@@ -101,6 +106,186 @@
         return Utils.reverseMap(executorToSlot);
     }
 
+    /**
+     * Round-robin relocation of currently-assigned workers onto fully-idle supervisors. Each round-robin iteration moves
+     * at most one worker per topology, so multiple topologies share the idle slots and a single returning supervisor ends
+     * up hosting workers from several topologies — preserving the per-supervisor workload diversity that a fresh
+     * cluster has after submission.
+     *
+     * <p>Per-topology cap in one scheduling round is
+     * {@code idleSupervisorCount * floor(numWorkers / nonBlacklistedSupervisorCount)}, further tightened by
+     * {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY} when set to a positive value. Topologies whose
+     * computed cap is zero (typically {@code numWorkers < numSupervisors}) are skipped entirely. The trigger remains
+     * binary — only fires when at least one supervisor has zero used slots — so a near-balanced cluster sees no
+     * movement. {@code numWorkers} here is the topology's <em>declared</em> worker count, so the cap is an upper bound,
+     * not a guarantee: for an under-assigned topology the donor guard in {@link #relocateOneWorkerOntoIdleSlot} — which
+     * never drains a source supervisor below one worker — can keep the actual number of relocations below it.
+     *
+     * <p>Workers are always pulled from the supervisor where this topology has the most workers, and only when that
+     * supervisor would still hold at least one worker afterward. Each pulled worker's executors are placed directly
+     * onto an idle slot, so the subsequent sortSlots / interleave pass cannot drop them back into the just-vacated
+     * slots. Ties between equally loaded source supervisors are resolved by supervisor id, lexicographically.
+     *
+     * <p>Gated by {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED}: when disabled (the default) the
+     * method returns before scanning any supervisor, so a cluster that has not opted in pays no per-scheduling-round cost.
+     *
+     * <p>This is the package-private entry point shared by {@link #scheduleTopologiesEvenly(Topologies, Cluster)} and
+     * {@link DefaultScheduler#defaultSchedule(Topologies, Cluster)}; its visibility is dictated by those callers, not by
+     * tests (which also reach it from the same package).
+     */
+    static void redistributeOntoIdleSupervisors(Topologies topologies, Cluster cluster) {
+        if (!ObjectReader.getBoolean(
+                cluster.getConf().get(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED), false)) {
+            return;
+        }
+        int nonBlacklistedSupervisorCount = 0;
+        int idleSupervisorCount = 0;
+        Deque<WorkerSlot> idleTargets = new ArrayDeque<>();
+        Set<String> idleSupervisorIds = new HashSet<>();
+        List<SupervisorDetails> supervisors = new ArrayList<>(cluster.getSupervisors().values());
+        supervisors.sort(Comparator.comparing(SupervisorDetails::getId));
+        for (SupervisorDetails s : supervisors) {
+            if (cluster.isBlackListed(s.getId())) {
+                continue;
+            }
+            if (s.getAllPorts().isEmpty()) {
+                continue;
+            }
+            nonBlacklistedSupervisorCount++;
+            if (cluster.isIdleSupervisorAvailableForEvenRebalance(s)) {
+                idleSupervisorCount++;
+                idleSupervisorIds.add(s.getId());
+                List<Integer> ports = new ArrayList<>(s.getAllPorts());
+                Collections.sort(ports);
+                for (Integer port : ports) {
+                    idleTargets.add(new WorkerSlot(s.getId(), port));
+                }
+            }
+        }
+        if (idleTargets.isEmpty() || nonBlacklistedSupervisorCount == 0 || idleSupervisorCount == 0) {
+            return;
+        }
+
+        int maxFree = ObjectReader.getInt(
+                cluster.getConf().get(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY), 0);
+
+        List<TopologyDetails> orderedTopos = new ArrayList<>();
+        Map<String, Integer> remainingBudget = new HashMap<>();
+        for (TopologyDetails topo : topologies.getTopologies()) {
+            // Skip topologies already present on every idle supervisor -- relocating gains them no workload diversity.
+            // Reuses the idle-supervisor set computed above instead of re-scanning every supervisor for each topology.
+            if (!topologyCanReuseIdleSupervisor(cluster, topo, idleSupervisorIds)) {
+                continue;
+            }
+            int target = (topo.getNumWorkers() / nonBlacklistedSupervisorCount) * idleSupervisorCount;
+            if (target <= 0) {
+                continue;
+            }
+            if (maxFree > 0) {
+                target = Math.min(target, maxFree);
+            }
+            orderedTopos.add(topo);
+            remainingBudget.put(topo.getId(), target);
+        }
+        if (orderedTopos.isEmpty()) {
+            return;
+        }
+        orderedTopos.sort(Comparator.comparing(TopologyDetails::getId));
+
+        int totalRelocated = 0;
+        while (!idleTargets.isEmpty()) {
+            boolean movedThisIteration = false;
+            for (TopologyDetails topo : orderedTopos) {
+                if (idleTargets.isEmpty()) {
+                    break;
+                }
+                int remaining = remainingBudget.getOrDefault(topo.getId(), 0);
+                if (remaining <= 0) {
+                    continue;
+                }
+                if (relocateOneWorkerOntoIdleSlot(topo, cluster, idleTargets)) {
+                    remainingBudget.put(topo.getId(), remaining - 1);
+                    totalRelocated++;
+                    movedThisIteration = true;
+                } else {
+                    remainingBudget.put(topo.getId(), 0);
+                }
+            }
+            if (!movedThisIteration) {
+                break;
+            }
+        }
+        if (totalRelocated > 0) {
+            LOG.info("EvenScheduler: relocated {} worker(s) onto idle supervisor(s) round-robin across {} topologies.",
+                    totalRelocated, orderedTopos.size());
+        }
+    }
+
+    /**
+     * Returns true when at least one of the already-identified idle supervisors does not currently host {@code topology}
+     * -- i.e. the topology can gain workload diversity by relocating onto it. This is the per-topology half of the binary
+     * idle-rebalance trigger; it operates on the pre-computed {@code idleSupervisorIds} (each already vetted by
+     * {@link Cluster#isIdleSupervisorAvailableForEvenRebalance(SupervisorDetails)}) so the per-topology loop avoids a full
+     * supervisor rescan.
+     */
+    private static boolean topologyCanReuseIdleSupervisor(Cluster cluster, TopologyDetails topology,
+                                                          Set<String> idleSupervisorIds) {
+        Set<String> nodesUsedByTopology = new HashSet<>();
+        for (WorkerSlot slot : cluster.getUsedSlotsByTopologyId(topology.getId())) {
+            nodesUsedByTopology.add(slot.getNodeId());
+        }
+        for (String idleSupervisorId : idleSupervisorIds) {
+            if (!nodesUsedByTopology.contains(idleSupervisorId)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Pulls a single worker from the supervisor where {@code topology} currently has the most workers and reassigns its
+     * executors onto the next idle slot from {@code idleTargets}. Returns false (without consuming an idle target) if
+     * the topology has no eligible source supervisor — namely all of its supervisors host at most one of its workers,
+     * which would otherwise drain that supervisor to zero and turn it into the next round's idle.
+     */
+    private static boolean relocateOneWorkerOntoIdleSlot(TopologyDetails topology, Cluster cluster,
+                                                         Deque<WorkerSlot> idleTargets) {
+        Map<WorkerSlot, List<ExecutorDetails>> slotToExecutors =
+                getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
+        Map<String, List<WorkerSlot>> nodeToSlots = new HashMap<>();
+        for (WorkerSlot slot : slotToExecutors.keySet()) {
+            nodeToSlots.computeIfAbsent(slot.getNodeId(), k -> new ArrayList<>()).add(slot);
+        }
+        List<Map.Entry<String, List<WorkerSlot>>> candidates = new ArrayList<>(nodeToSlots.entrySet());
+        candidates.removeIf(e -> e.getValue().size() < 2);
+        candidates.sort(Comparator
+                .<Map.Entry<String, List<WorkerSlot>>>comparingInt(e -> e.getValue().size())
+                .reversed()
+                .thenComparing(Map.Entry::getKey));
+        if (candidates.isEmpty()) {
+            return false;
+        }
+        List<WorkerSlot> slots = candidates.get(0).getValue();
+        slots.sort(Comparator.comparingInt(WorkerSlot::getPort));
+        WorkerSlot victim = slots.get(slots.size() - 1);
+        Collection<ExecutorDetails> execs = slotToExecutors.get(victim);
+        if (execs == null || execs.isEmpty()) {
+            return false;
+        }
+        if (idleTargets.isEmpty()) {
+            return false;
+        }
+        WorkerSlot target = idleTargets.poll();
+        // freeSlot-then-assign is intentionally ordered and non-atomic. target is a pre-verified fully-idle slot --
+        // idleTargets only holds ports from supervisors that passed Cluster#isIdleSupervisorAvailableForEvenRebalance,
+        // which requires getUsedPorts to be empty -- so assign cannot hit its slot-occupied path and no rollback is
+        // needed. In the near-impossible event assign threw here, the freed executors are picked up by the regular
+        // scheduling pass on the same round.
+        cluster.freeSlot(victim);
+        cluster.assign(target, topology.getId(), execs);
+        return true;
+    }
+
     private static Map<ExecutorDetails, WorkerSlot> scheduleTopology(TopologyDetails topology, Cluster cluster) {
         List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
         Set<ExecutorDetails> allExecutors = topology.getExecutors();
@@ -148,7 +333,16 @@
     }
 
     public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) {
+        redistributeOntoIdleSupervisors(topologies, cluster);
         for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
+            // needsSchedulingTopologies() returns the cluster's full topology set, but this run is scoped to the
+            // topologies passed in: EvenScheduler.schedule passes the full set (so the guard is a no-op), while
+            // DefaultScheduler.defaultSchedule calls us once per leftover topology with a single-topology Topologies.
+            // redistributeOntoIdleSupervisors above acted only on that passed-in set too. Skip topologies outside it so
+            // the leftover path never schedules one the caller excluded -- e.g. a down isolated topology on a reserved host.
+            if (topologies.getById(topology.getId()) == null) {
+                continue;
+            }
             String topologyId = topology.getId();
             Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster);
             Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment);
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
index 1886273..f03e1de 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java
@@ -46,6 +46,7 @@
      * all the ports of the supervisor.
      */
     private Set<Integer> allPorts;
+    private final long uptimeSecs;
 
     /**
      * Create the details of a new supervisor.
@@ -59,12 +60,22 @@
      */
     public SupervisorDetails(String id, Integer serverPort, String host, Object meta, Object schedulerMeta,
                              Collection<? extends Number> allPorts, Map<String, Double> totalResources) {
+        // Callers that do not supply uptime (tests and every path other than the idle-supervisor rebalance) default to
+        // Long.MAX_VALUE, i.e. treated as indefinitely stable so the rebalance flap guard never holds them back. This is
+        // the deliberate opposite of Nimbus#supervisorUptimeSecs, which maps an unset uptime to 0L; production always
+        // supplies the real uptime through that path, so this default only affects non-feature callers.
+        this(id, serverPort, host, meta, schedulerMeta, allPorts, totalResources, Long.MAX_VALUE);
+    }
+
+    public SupervisorDetails(String id, Integer serverPort, String host, Object meta, Object schedulerMeta,
+                             Collection<? extends Number> allPorts, Map<String, Double> totalResources, long uptimeSecs) {
 
         this.id = id;
         this.serverPort = serverPort;
         this.host = host;
         this.meta = meta;
         this.schedulerMeta = schedulerMeta;
+        this.uptimeSecs = uptimeSecs;
         if (allPorts != null) {
             setAllPorts(allPorts);
         } else {
@@ -82,6 +93,10 @@
         this(id, null, null, meta, null, null, totalResources);
     }
 
+    public SupervisorDetails(String id, Object meta, Map<String, Double> totalResources, long uptimeSecs) {
+        this(id, null, null, meta, null, null, totalResources, uptimeSecs);
+    }
+
     public SupervisorDetails(String id, Object meta, Collection<? extends Number> allPorts) {
         this(id, null, null, meta, null, allPorts, null);
     }
@@ -95,11 +110,21 @@
         this(id, null, host, null, schedulerMeta, allPorts, totalResources);
     }
 
+    public SupervisorDetails(String id, String host, Object schedulerMeta,
+                             Collection<? extends Number> allPorts, Map<String, Double> totalResources, long uptimeSecs) {
+        this(id, null, host, null, schedulerMeta, allPorts, totalResources, uptimeSecs);
+    }
+
     public SupervisorDetails(String id, int serverPort, String host, Object schedulerMeta,
                              Collection<? extends Number> allPorts, Map<String, Double> totalResources) {
         this(id, serverPort, host, null, schedulerMeta, allPorts, totalResources);
     }
 
+    public SupervisorDetails(String id, int serverPort, String host, Object schedulerMeta,
+                             Collection<? extends Number> allPorts, Map<String, Double> totalResources, long uptimeSecs) {
+        this(id, serverPort, host, null, schedulerMeta, allPorts, totalResources, uptimeSecs);
+    }
+
     @Override
     public String toString() {
         return getClass().getSimpleName() + " ID: " + id + " HOST: " + host + " META: " + meta
@@ -126,6 +151,10 @@
         return allPorts;
     }
 
+    public long getUptimeSecs() {
+        return uptimeSecs;
+    }
+
     private void setAllPorts(Collection<? extends Number> allPorts) {
         this.allPorts = new HashSet<>();
         if (allPorts != null) {
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java b/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java
new file mode 100644
index 0000000..c5849c6
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java
@@ -0,0 +1,691 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.blacklist.TestUtilsForBlacklistScheduler;
+import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for the idle-supervisor rebalance behavior added to
+ * {@link EvenScheduler#redistributeOntoIdleSupervisors(Topologies, Cluster)} and its per-supervisor eligibility predicate
+ * {@link Cluster#isIdleSupervisorAvailableForEvenRebalance(SupervisorDetails)}.
+ *
+ * <p>Trigger condition is binary: at least one non-blacklisted supervisor with zero used slots must exist. The cluster
+ * being "almost balanced" never triggers the new logic, so a near-even distribution is preserved as-is. Each round only
+ * frees up to {@code nimbus.even.rebalance.max.free.per.topology} workers and never drains a supervisor down to zero.
+ * The tests assert on the observable effect of {@code redistributeOntoIdleSupervisors} (the resulting assignment) rather
+ * than on any internal boolean predicate.
+ */
+public class TestEvenSchedulerIdleSupervisor {
+
+    private static final String TOPO_ID = "topo-1";
+
+    /**
+     * supA and supB host the topology; supC is freshly returned and idle. Topology has 2 workers on supA and 1 on supB.
+     */
+    private Cluster buildClusterWithIdleSupervisor(boolean enableRebalance, int maxFreePerTopology) {
+        return buildClusterWithIdleSupervisor(TestUtilsForBlacklistScheduler.genSupervisors(3, 4),
+                evenRebalanceConf(enableRebalance, maxFreePerTopology));
+    }
+
+    private Cluster buildClusterWithIdleSupervisor(Map<String, SupervisorDetails> supMap, Map<String, Object> conf) {
+        // Build a topology and assign 3 workers: two on sup-0 and one on sup-1. sup-2 stays idle.
+        TopologyDetails topology = makeTopologyDetails(TOPO_ID, 3);
+
+        WorkerSlot s0p0 = new WorkerSlot("sup-0", 0);
+        WorkerSlot s0p1 = new WorkerSlot("sup-0", 1);
+        WorkerSlot s1p0 = new WorkerSlot("sup-1", 0);
+
+        List<ExecutorDetails> execs = new LinkedList<>(topology.getExecutors());
+        Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask()));
+        // Distribute the executors round-robin onto the three slots so each slot has at least one.
+        Map<ExecutorDetails, WorkerSlot> execToSlot = new HashMap<>();
+        WorkerSlot[] slotRing = new WorkerSlot[]{s0p0, s0p1, s1p0};
+        for (int i = 0; i < execs.size(); i++) {
+            execToSlot.put(execs.get(i), slotRing[i % slotRing.length]);
+        }
+        SchedulerAssignmentImpl assignment = new SchedulerAssignmentImpl(TOPO_ID, execToSlot, null, null);
+
+        Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+        assignments.put(TOPO_ID, assignment);
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(TOPO_ID, topology);
+        Topologies topologies = new Topologies(topoMap);
+
+        return newCluster(supMap, assignments, topologies, conf);
+    }
+
+    private Map<String, Object> evenRebalanceConf(boolean enableRebalance, int maxFreePerTopology) {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, enableRebalance);
+        conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, maxFreePerTopology);
+        conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS, 0);
+        conf.put(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS, 3);
+        return conf;
+    }
+
+    private Map<String, SupervisorDetails> genSupervisorsWithUptime(int numSup, int numPorts, long uptimeSecs) {
+        Map<String, SupervisorDetails> supMap = new HashMap<>();
+        for (int i = 0; i < numSup; i++) {
+            SupervisorDetails sup = supervisor("sup-" + i, "host-" + i, numPorts, uptimeSecs);
+            supMap.put(sup.getId(), sup);
+        }
+        return supMap;
+    }
+
+    private SupervisorDetails supervisor(String id, String host, int numPorts, long uptimeSecs) {
+        List<Number> ports = new LinkedList<>();
+        for (int port = 0; port < numPorts; port++) {
+            ports.add(port);
+        }
+        return new SupervisorDetails(id, host, null, ports, null, uptimeSecs);
+    }
+
+    private Cluster newCluster(Map<String, SupervisorDetails> supMap,
+                               Map<String, SchedulerAssignmentImpl> assignments,
+                               Topologies topologies,
+                               Map<String, Object> conf) {
+        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+        ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
+        return new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, supMap,
+                assignments, topologies, conf);
+    }
+
+    private TopologyDetails makeTopologyDetails(String id, int numWorkers, int parallelism) {
+        Config conf = new Config();
+        conf.put(Config.TOPOLOGY_NAME, id);
+        conf.put(Config.TOPOLOGY_WORKERS, numWorkers);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("spout-0", new TestUtilsForBlacklistScheduler.TestSpout(), parallelism);
+        builder.setBolt("bolt-0", new TestUtilsForBlacklistScheduler.TestBolt(), parallelism).shuffleGrouping("spout-0");
+        StormTopology stormTopology = builder.createTopology();
+
+        Map<ExecutorDetails, String> execsAndComps = TestUtilsForBlacklistScheduler.genExecsAndComps(
+                stormTopology, parallelism, parallelism);
+        return new TopologyDetails(id, conf, stormTopology, numWorkers, execsAndComps, 0, "user");
+    }
+
+    private TopologyDetails makeTopologyDetails(String id, int numWorkers) {
+        return makeTopologyDetails(id, numWorkers, 3);
+    }
+
+    private TopologyDetails firstTopology(Cluster cluster) {
+        return cluster.getTopologies().getById(TOPO_ID);
+    }
+
+    private int usedSlotCount(Cluster cluster, String supervisorId) {
+        SupervisorDetails s = cluster.getSupervisorById(supervisorId);
+        return cluster.getUsedPorts(s).size();
+    }
+
+    @Test
+    public void disabledByDefault_doesNotTrigger() {
+        Cluster cluster = buildClusterWithIdleSupervisor(false, 1);
+
+        EvenScheduler.redistributeOntoIdleSupervisors(cluster.getTopologies(), cluster);
+
+        // Disabled flag must short-circuit the rebalance even when an idle supervisor exists: nothing moves onto sup-2.
+        assertEquals(2, usedSlotCount(cluster, "sup-0"));
+        assertEquals(1, usedSlotCount(cluster, "sup-1"));
+        assertEquals(0, usedSlotCount(cluster, "sup-2"));
+        assertFalse(cluster.needsScheduling(firstTopology(cluster)),
+                "needsScheduling must remain false when the new behavior is disabled and the topology is fully assigned");
+    }
+
+    @Test
+    public void enabledWithIdleSupervisor_doesNotChangeGenericNeedsScheduling() {
+        Cluster cluster = buildClusterWithIdleSupervisor(true, 1);
+
+        // Enabling the opt-in idle rebalance must not leak into the generic scheduling triggers other schedulers use.
+        assertFalse(cluster.needsScheduling(firstTopology(cluster)),
+                "needsScheduling is used by schedulers other than EvenScheduler; the idle trigger stays out of that generic path");
+        assertFalse(cluster.needsSchedulingRas(firstTopology(cluster)),
+                "ResourceAwareScheduler keeps using needsSchedulingRas, so this opt-in EvenScheduler feature is out of RAS scope");
+
+        EvenScheduler.redistributeOntoIdleSupervisors(cluster.getTopologies(), cluster);
+
+        // The feature itself still fires: one worker relocates onto the idle supervisor (the observable trigger)...
+        assertEquals(1, usedSlotCount(cluster, "sup-2"));
+        assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)));
+        // ...while the generic triggers stay false afterward -- the relocation kept the topology fully assigned.
+        assertFalse(cluster.needsScheduling(firstTopology(cluster)));
+        assertFalse(cluster.needsSchedulingRas(firstTopology(cluster)));
+    }
+
+    @Test
+    public void noIdleSupervisor_doesNotTrigger() {
+        // Two supervisors, both serving the topology -> no idle supervisor present.
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(2, 4);
+        TopologyDetails topology = makeTopologyDetails(TOPO_ID, 3);
+
+        Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+        assignments.put(TOPO_ID, buildAssignment(topology, new WorkerSlot[]{
+                new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), new WorkerSlot("sup-1", 0),
+        }));
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(TOPO_ID, topology);
+        Topologies topologies = new Topologies(topoMap);
+
+        Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 1));
+
+        EvenScheduler.redistributeOntoIdleSupervisors(cluster.getTopologies(), cluster);
+
+        // No supervisor has zero used slots, so the binary trigger never fires: the assignment is left untouched.
+        assertEquals(2, usedSlotCount(cluster, "sup-0"));
+        assertEquals(1, usedSlotCount(cluster, "sup-1"));
+        assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)));
+        assertFalse(cluster.needsScheduling(firstTopology(cluster)));
+    }
+
+    @Test
+    public void redistributeRelocatesAtMostMaxFreeWorkersPerTopology() {
+        Cluster cluster = buildClusterWithIdleSupervisor(true, 1);
+        assertEquals(2, usedSlotCount(cluster, "sup-0"));
+        assertEquals(1, usedSlotCount(cluster, "sup-1"));
+        assertEquals(0, usedSlotCount(cluster, "sup-2"));
+
+        EvenScheduler.redistributeOntoIdleSupervisors(cluster.getTopologies(), cluster);
+
+        // max-free=1 caps the topology to a single relocation; pulled from the most-loaded supervisor (sup-0)
+        // and placed directly onto the idle supervisor.
+        assertEquals(1, usedSlotCount(cluster, "sup-0"));
+        assertEquals(1, usedSlotCount(cluster, "sup-1"));
+        assertEquals(1, usedSlotCount(cluster, "sup-2"));
+        assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)));
+    }
+
+    @Test
+    public void redistributeNeverDrainsSupervisorToZero() {
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+        TopologyDetails topology = makeTopologyDetails(TOPO_ID, 2);
+
+        Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+        assignments.put(TOPO_ID, buildAssignment(topology, new WorkerSlot[]{
+                new WorkerSlot("sup-0", 0), new WorkerSlot("sup-1", 0),
+        }));
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(TOPO_ID, topology);
+        Topologies topologies = new Topologies(topoMap);
+
+        Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 5));
+
+        EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
+
+        // floor(2/3)=0 → topology gets a budget of 0 and is skipped entirely. No source supervisor is drained.
+        assertEquals(1, usedSlotCount(cluster, "sup-0"));
+        assertEquals(1, usedSlotCount(cluster, "sup-1"));
+        assertEquals(0, usedSlotCount(cluster, "sup-2"));
+    }
+
+    @Test
+    public void scheduleTopologiesEvenly_movesOneWorkerToIdleSupervisor() {
+        Cluster cluster = buildClusterWithIdleSupervisor(true, 1);
+
+        Topologies topologies = cluster.getTopologies();
+        EvenScheduler.scheduleTopologiesEvenly(topologies, cluster);
+
+        // After scheduling: idle supervisor (sup-2) should now host exactly 1 worker.
+        assertEquals(1, usedSlotCount(cluster, "sup-2"));
+        // Total worker count is preserved (3) and respects the topology's numWorkers.
+        int total = usedSlotCount(cluster, "sup-0")
+                + usedSlotCount(cluster, "sup-1")
+                + usedSlotCount(cluster, "sup-2");
+        assertEquals(3, total);
+        assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)),
+                "relocation must preserve the topology's declared worker count, not just keep 3 slots occupied");
+    }
+
+    /**
+     * Single-worker topology + idle supervisors must produce no movement: {@code floor(1 / N) = 0} for any N >= 2, so the
+     * drain budget evaluates to zero regardless of how many idle supervisors exist. Without this guard a single-worker
+     * topology would ping-pong between supervisors every monitor cycle.
+     */
+    @Test
+    public void singleWorkerTopology_doesNotMoveDespiteIdleSupervisors() {
+        String topoId = "topo-single";
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+        TopologyDetails topology = makeTopologyDetails(topoId, 1, 1);
+
+        Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+        assignments.put(topoId, buildAssignment(topology, new WorkerSlot[]{ new WorkerSlot("sup-0", 0) }));
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topoId, topology);
+        Topologies topologies = new Topologies(topoMap);
+
+        Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 0));
+
+        EvenScheduler.scheduleTopologiesEvenly(topologies, cluster);
+
+        assertEquals(1, usedSlotCount(cluster, "sup-0"));
+        assertEquals(0, usedSlotCount(cluster, "sup-1"));
+        assertEquals(0, usedSlotCount(cluster, "sup-2"));
+    }
+
+    /**
+     * 8-worker topology starts at distribution (4, 4, 0). With max-free unbounded the budget targets
+     * floor(numWorkers / numSupervisors) = 2 workers for the idle supervisor, and the round ends at (3, 3, 2)
+     * — fully even — without disturbing topologies on the next round (no supervisor is idle anymore).
+     */
+    @Test
+    public void evenDistributionInOneRound_unboundedMaxFree() {
+        String topoId = "topo-even";
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+        TopologyDetails topology = makeTopologyDetails(topoId, 8, 4);
+
+        Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+        assignments.put(topoId, buildAssignment(topology, new WorkerSlot[]{
+                new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
+                new WorkerSlot("sup-0", 2), new WorkerSlot("sup-0", 3),
+                new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1),
+                new WorkerSlot("sup-1", 2), new WorkerSlot("sup-1", 3),
+        }));
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topoId, topology);
+        Topologies topologies = new Topologies(topoMap);
+
+        Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 0));
+
+        assertEquals(4, usedSlotCount(cluster, "sup-0"));
+        assertEquals(4, usedSlotCount(cluster, "sup-1"));
+        assertEquals(0, usedSlotCount(cluster, "sup-2"));
+
+        EvenScheduler.scheduleTopologiesEvenly(topologies, cluster);
+
+        // Idle supervisor absorbs exactly floor(8/3) = 2 workers in one round; total worker count is preserved.
+        assertEquals(2, usedSlotCount(cluster, "sup-2"));
+        assertEquals(8, usedSlotCount(cluster, "sup-0")
+                + usedSlotCount(cluster, "sup-1")
+                + usedSlotCount(cluster, "sup-2"));
+        assertEquals(8, cluster.getAssignedNumWorkers(cluster.getTopologies().getById(topoId)),
+                "relocation must preserve the declared worker count of an 8-worker topology");
+        // No supervisor is idle anymore, so a second pass relocates nothing -- the trigger will not refire next round.
+        EvenScheduler.redistributeOntoIdleSupervisors(cluster.getTopologies(), cluster);
+        assertEquals(2, usedSlotCount(cluster, "sup-2"));
+        assertEquals(8, usedSlotCount(cluster, "sup-0")
+                + usedSlotCount(cluster, "sup-1")
+                + usedSlotCount(cluster, "sup-2"));
+    }
+
+    /**
+     * Two equally-sized topologies share the same returning supervisor round-robin: each contributes one worker, so
+     * sup-2 ends up hosting workers from both — restoring per-supervisor workload diversity the way a fresh submission
+     * would.
+     */
+    @Test
+    public void multipleTopologies_shareIdleSlotsRoundRobin() {
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+
+        TopologyDetails topoA = makeTopologyDetails("topo-A", 4, 2);
+        TopologyDetails topoB = makeTopologyDetails("topo-B", 4, 2);
+
+        Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+        assignments.put("topo-A", buildAssignment(topoA, new WorkerSlot[]{
+                new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
+                new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1),
+        }));
+        assignments.put("topo-B", buildAssignment(topoB, new WorkerSlot[]{
+                new WorkerSlot("sup-0", 2), new WorkerSlot("sup-0", 3),
+                new WorkerSlot("sup-1", 2), new WorkerSlot("sup-1", 3),
+        }));
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put("topo-A", topoA);
+        topoMap.put("topo-B", topoB);
+        Topologies topologies = new Topologies(topoMap);
+
+        Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 0));
+
+        EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
+
+        // floor(4/3)=1 per topology, two topologies → sup-2 hosts 1 worker from each, in round-robin order.
+        // Exact counts (not >= 1) are what actually enforce round-robin fairness: a broken inner loop that let the
+        // first topology grab both idle slots would leave topo-B at 0 here.
+        assertEquals(2, usedSlotCount(cluster, "sup-2"));
+        assertEquals(1, supervisorWorkerCount(cluster, "topo-A", "sup-2"));
+        assertEquals(1, supervisorWorkerCount(cluster, "topo-B", "sup-2"));
+        // Each topology kept its total worker count; only one host moved.
+        assertEquals(4, cluster.getAssignedNumWorkers(topoA));
+        assertEquals(4, cluster.getAssignedNumWorkers(topoB));
+        // Donor supervisors are never drained to zero (which would make them the next round's idle target).
+        assertTrue(usedSlotCount(cluster, "sup-0") > 0);
+        assertTrue(usedSlotCount(cluster, "sup-1") > 0);
+    }
+
+    /**
+     * Flap-guard boundary: with 3 stable rounds at a 3s monitor frequency a returning supervisor must have been up for
+     * at least 9 seconds before it is eligible. {@code uptime == requiredUptime} is the first value that moves, making
+     * the off-by-one contract explicit: {@code uptimeSecs >= minStableRounds * monitorFrequencySecs}.
+     */
+    @ParameterizedTest
+    @CsvSource({
+            "8, false",   // threshold - 1: too young, stays idle
+            "9, true",    // exactly at threshold: eligible
+            "10, true",   // threshold + 1: eligible
+    })
+    public void flapGuardHonorsMinStableRoundBoundary(long sup2UptimeSecs, boolean expectMove) {
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithUptime(3, 4, 100);
+        supMap.put("sup-2", supervisor("sup-2", "host-2", 4, sup2UptimeSecs));
+
+        Map<String, Object> conf = evenRebalanceConf(true, 1);
+        conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS, 3);
+        conf.put(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS, 3);
+        Cluster cluster = buildClusterWithIdleSupervisor(supMap, conf);
+
+        EvenScheduler.scheduleTopologiesEvenly(cluster.getTopologies(), cluster);
+
+        // 3 stable rounds at a 3 second monitor frequency require at least 9 seconds of supervisor uptime before sup-2
+        // becomes an eligible target; the placement below is the observable expression of that boundary.
+        if (expectMove) {
+            assertEquals(1, usedSlotCount(cluster, "sup-2"));
+            assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)));
+        } else {
+            assertEquals(2, usedSlotCount(cluster, "sup-0"));
+            assertEquals(1, usedSlotCount(cluster, "sup-1"));
+            assertEquals(0, usedSlotCount(cluster, "sup-2"));
+        }
+    }
+
+    @Test
+    public void donorTieBreaksBySupervisorIdWhenWorkerCountsTie() {
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithUptime(3, 4, 100);
+        TopologyDetails topology = makeTopologyDetails("topo-tie", 4, 4);
+
+        Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+        assignments.put("topo-tie", buildAssignment(topology, new WorkerSlot[]{
+                new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
+                new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1),
+        }));
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put("topo-tie", topology);
+        Topologies topologies = new Topologies(topoMap);
+        Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 1));
+
+        EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
+
+        assertEquals(1, supervisorWorkerCount(cluster, "topo-tie", "sup-0"),
+                "sup-0 and sup-1 started with two workers each; lexicographic tie-break chooses sup-0 as donor");
+        assertEquals(2, supervisorWorkerCount(cluster, "topo-tie", "sup-1"));
+        assertEquals(1, supervisorWorkerCount(cluster, "topo-tie", "sup-2"));
+        assertEquals(4, cluster.getAssignedNumWorkers(topology),
+                "the tie-break relocation preserves the topology's declared worker count");
+    }
+
+    /**
+     * Two simultaneously-idle supervisors must let a single topology relocate
+     * {@code floor(numWorkers / nonBlacklistedSupervisorCount) * idleSupervisorCount} workers in one round, exercising the
+     * {@code * idleSupervisorCount} term of the budget formula. With 4 non-blacklisted supervisors (two busy, two idle)
+     * and an 8-worker topology the budget is {@code floor(8 / 4) * 2 = 4}; a regression that dropped the multiplier would
+     * compute 2 and relocate only half as many workers. Every other test has exactly one usable idle supervisor, so this
+     * fixture is the one that pins the multiplier down.
+     */
+    @Test
+    public void twoIdleSupervisors_budgetScalesWithIdleSupervisorCount() {
+        String topoId = "topo-two-idle";
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(4, 4);
+        TopologyDetails topology = makeTopologyDetails(topoId, 8, 4);
+
+        Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+        assignments.put(topoId, buildAssignment(topology, new WorkerSlot[]{
+                new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
+                new WorkerSlot("sup-0", 2), new WorkerSlot("sup-0", 3),
+                new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1),
+                new WorkerSlot("sup-1", 2), new WorkerSlot("sup-1", 3),
+        }));
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topoId, topology);
+        Topologies topologies = new Topologies(topoMap);
+
+        Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 0));
+
+        // sup-2 and sup-3 both start idle.
+        assertEquals(0, usedSlotCount(cluster, "sup-2"));
+        assertEquals(0, usedSlotCount(cluster, "sup-3"));
+
+        EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
+
+        // budget = floor(8 / 4 non-blacklisted) * 2 idle = 4 relocations onto the idle supervisors. Asserting the sum
+        // (not a single supervisor) keeps this independent of the idle-slot fill order. Dropping the * idleSupervisorCount
+        // term would compute budget 2 and move only 2 workers, failing this assertion.
+        assertEquals(4, usedSlotCount(cluster, "sup-2") + usedSlotCount(cluster, "sup-3"),
+                "budget must scale with the number of simultaneously-idle supervisors: floor(8/4) * 2 = 4");
+        assertEquals(4, usedSlotCount(cluster, "sup-0") + usedSlotCount(cluster, "sup-1"));
+        assertEquals(8, cluster.getAssignedNumWorkers(cluster.getTopologies().getById(topoId)),
+                "relocation preserves the declared worker count");
+    }
+
+    /**
+     * {@code max.free.per.topology} must be able to bind more tightly than the even-distribution budget. With 3
+     * supervisors and a 6-worker topology the even budget is {@code floor(6 / 3) * 1 = 2}, but {@code maxFree = 1} clamps
+     * it to a single relocation. This is the only fixture where the {@code Math.min(target, maxFree)} clamp is the
+     * strictly binding constraint -- removing the clamp would relocate 2 workers and push sup-2 to 2, failing the
+     * assertion below.
+     */
+    @Test
+    public void maxFreePerTopologyClampsBelowEvenBudget() {
+        String topoId = "topo-clamp";
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4);
+        TopologyDetails topology = makeTopologyDetails(topoId, 6, 3);
+
+        Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+        assignments.put(topoId, buildAssignment(topology, new WorkerSlot[]{
+                new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), new WorkerSlot("sup-0", 2),
+                new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), new WorkerSlot("sup-1", 2),
+        }));
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(topoId, topology);
+        Topologies topologies = new Topologies(topoMap);
+
+        Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 1));
+
+        EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
+
+        // even budget = floor(6/3)*1 = 2, but maxFree=1 clamps it to a single relocation. Without Math.min(target,
+        // maxFree) two workers would move and sup-2 would hold 2.
+        assertEquals(1, usedSlotCount(cluster, "sup-2"),
+                "max.free.per.topology must clamp the even-distribution budget of 2 down to 1");
+        assertEquals(6, cluster.getAssignedNumWorkers(cluster.getTopologies().getById(topoId)));
+        assertTrue(usedSlotCount(cluster, "sup-0") > 0);
+        assertTrue(usedSlotCount(cluster, "sup-1") > 0);
+    }
+
+    @Test
+    public void blacklistedIdleSupervisorIsNotReusableTarget() {
+        Cluster cluster = buildClusterWithIdleSupervisor(true, 1);
+        cluster.blacklistHost("host-2");
+
+        assertFalse(cluster.isIdleSupervisorAvailableForEvenRebalance(cluster.getSupervisorById("sup-2")),
+                "IsolationScheduler represents reserved hosts by blacklisting them before delegating to DefaultScheduler");
+
+        EvenScheduler.scheduleTopologiesEvenly(cluster.getTopologies(), cluster);
+
+        // The blacklisted idle supervisor is never a target, so the assignment is left untouched.
+        assertEquals(2, usedSlotCount(cluster, "sup-0"));
+        assertEquals(1, usedSlotCount(cluster, "sup-1"));
+        assertEquals(0, usedSlotCount(cluster, "sup-2"));
+        assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster)));
+    }
+
+    @Test
+    public void defaultSchedulerIdleRebalanceHonorsLeftoverTopologySubset() {
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithUptime(3, 4, 100);
+        TopologyDetails isolated = makeTopologyDetails("topo-isolated", 2, 2);
+        TopologyDetails regular = makeTopologyDetails("topo-regular", 3, 3);
+
+        Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+        assignments.put(isolated.getId(), buildAssignment(isolated, new WorkerSlot[]{
+                new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
+        }));
+        assignments.put(regular.getId(), buildAssignment(regular, new WorkerSlot[]{
+                new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), new WorkerSlot("sup-1", 2),
+        }));
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(isolated.getId(), isolated);
+        topoMap.put(regular.getId(), regular);
+        Topologies allTopologies = new Topologies(topoMap);
+        Cluster cluster = newCluster(supMap, assignments, allTopologies, evenRebalanceConf(true, 0));
+
+        cluster.blacklistHost("host-0");
+        DefaultScheduler.defaultSchedule(new Topologies(regular), cluster);
+
+        assertEquals(2, supervisorWorkerCount(cluster, isolated.getId(), "sup-0"),
+                "the isolated topology is not in the leftover topology set, so DefaultScheduler must not move it");
+        assertEquals(0, supervisorWorkerCount(cluster, isolated.getId(), "sup-2"));
+        assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1"));
+        assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-2"));
+        // Both topologies keep their declared worker counts: the leftover one is relocated, the excluded one untouched.
+        assertEquals(3, cluster.getAssignedNumWorkers(regular));
+        assertEquals(2, cluster.getAssignedNumWorkers(isolated));
+    }
+
+    @Test
+    public void isolationSchedulerOnlyRelocatesLeftoverTopologyOntoNonIsolatedIdleSupervisor() {
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithUptime(3, 4, 100);
+        TopologyDetails isolated = makeTopologyDetails("topo-isolated", 1, 1);
+        TopologyDetails regular = makeTopologyDetails("topo-regular", 3, 3);
+
+        Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+        assignments.put(isolated.getId(), buildAssignment(isolated, new WorkerSlot[]{
+                new WorkerSlot("sup-0", 0),
+        }));
+        assignments.put(regular.getId(), buildAssignment(regular, new WorkerSlot[]{
+                new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), new WorkerSlot("sup-1", 2),
+        }));
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(isolated.getId(), isolated);
+        topoMap.put(regular.getId(), regular);
+        Topologies topologies = new Topologies(topoMap);
+
+        Map<String, Object> conf = evenRebalanceConf(true, 0);
+        conf.put(DaemonConfig.ISOLATION_SCHEDULER_MACHINES, Collections.singletonMap(isolated.getName(), 1));
+        Cluster cluster = newCluster(supMap, assignments, topologies, conf);
+
+        IsolationScheduler scheduler = new IsolationScheduler();
+        scheduler.prepare(conf, new StormMetricsRegistry());
+        scheduler.schedule(topologies, cluster);
+
+        assertEquals(1, supervisorWorkerCount(cluster, isolated.getId(), "sup-0"),
+                "the already-isolated topology remains on its isolated host and is not selected as a donor");
+        assertEquals(0, supervisorWorkerCount(cluster, isolated.getId(), "sup-2"));
+        assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1"));
+        assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-2"),
+                "only the leftover regular topology is allowed to move onto the non-isolated idle supervisor");
+        // The relocated leftover topology and the untouched isolated topology both keep their declared worker counts.
+        assertEquals(3, cluster.getAssignedNumWorkers(regular));
+        assertEquals(1, cluster.getAssignedNumWorkers(isolated));
+    }
+
+    /**
+     * IsolationScheduler reserves a host by blacklisting it before delegating the remaining (non-isolated) topologies to
+     * {@link DefaultScheduler#defaultSchedule(Topologies, Cluster)}. Here the isolated topology is down -- it has no
+     * assigned workers at all -- yet its reserved host (sup-2) must not be treated as an idle relocation target even
+     * though it has zero used slots. The leftover regular topology is rebalanced onto the genuinely idle, non-reserved
+     * sup-3 and never onto the reserved sup-2.
+     */
+    @Test
+    public void reservedHostForDownIsolatedTopologyIsNotTreatedAsIdle() {
+        Map<String, SupervisorDetails> supMap = genSupervisorsWithUptime(4, 4, 100);
+        TopologyDetails isolated = makeTopologyDetails("topo-isolated", 1, 1);
+        TopologyDetails regular = makeTopologyDetails("topo-regular", 4, 4);
+
+        Map<String, SchedulerAssignmentImpl> assignments = new HashMap<>();
+        // The isolated topology is down: it has no assignment at all.
+        assignments.put(regular.getId(), buildAssignment(regular, new WorkerSlot[]{
+                new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1),
+                new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1),
+        }));
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        topoMap.put(isolated.getId(), isolated);
+        topoMap.put(regular.getId(), regular);
+        Topologies allTopologies = new Topologies(topoMap);
+        Cluster cluster = newCluster(supMap, assignments, allTopologies, evenRebalanceConf(true, 0));
+
+        // sup-2 is reserved for the (down) isolated topology -- IsolationScheduler represents this by blacklisting it.
+        cluster.blacklistHost("host-2");
+
+        assertFalse(cluster.isIdleSupervisorAvailableForEvenRebalance(cluster.getSupervisorById("sup-2")),
+                "a blacklisted reserved host is never an even-rebalance target, even with zero used slots");
+        assertTrue(cluster.isIdleSupervisorAvailableForEvenRebalance(cluster.getSupervisorById("sup-3")),
+                "the non-reserved idle supervisor is available");
+
+        // IsolationScheduler delegates the leftover (non-isolated) topologies to DefaultScheduler with the reserved
+        // host already blacklisted.
+        DefaultScheduler.defaultSchedule(new Topologies(regular), cluster);
+
+        assertEquals(0, usedSlotCount(cluster, "sup-2"),
+                "the reserved host stays idle: the down isolated topology's machine is not repopulated by rebalance");
+        assertEquals(0, supervisorWorkerCount(cluster, regular.getId(), "sup-2"));
+        assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-3"),
+                "the leftover regular topology rebalances onto the genuinely idle, non-reserved supervisor");
+        assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-0"));
+        assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1"));
+        assertEquals(4, cluster.getAssignedNumWorkers(regular),
+                "the leftover topology keeps all 4 workers; the move never loses executors");
+        assertEquals(0, cluster.getUsedSlotsByTopologyId(isolated.getId()).size(),
+                "the isolated topology is down and is never scheduled by the leftover path");
+    }
+
+    private SchedulerAssignmentImpl buildAssignment(TopologyDetails topology, WorkerSlot[] slots) {
+        List<ExecutorDetails> execs = new LinkedList<>(topology.getExecutors());
+        Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask()));
+        Map<ExecutorDetails, WorkerSlot> map = new HashMap<>();
+        for (int i = 0; i < execs.size(); i++) {
+            map.put(execs.get(i), slots[i % slots.length]);
+        }
+        return new SchedulerAssignmentImpl(topology.getId(), map, null, null);
+    }
+
+    private int supervisorWorkerCount(Cluster cluster, String topologyId, String supervisorId) {
+        int count = 0;
+        for (WorkerSlot slot : cluster.getUsedSlotsByTopologyId(topologyId)) {
+            if (slot.getNodeId().equals(supervisorId)) {
+                count++;
+            }
+        }
+        return count;
+    }
+}