STORM-4051 - Scheduler needs to include acker memory for topology resources (#3633)
* Scheduler needs to include acker memory for topology resources
* address review comments
---------
Co-authored-by: Aaron Gresch <agresch@yahooinc.com>
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 953e52a..89d6849 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -627,14 +627,12 @@
double cpuAvailable = resourcesAvailable.getTotalCpu();
if (cpuAdded > cpuAvailable) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
- td.getName(),
- exec,
- ws,
- cpuAdded,
- cpuAvailable);
- }
+ LOG.debug("Could not schedule {}:{} on {} not enough CPU {} > {}",
+ td.getName(),
+ exec,
+ ws,
+ cpuAdded,
+ cpuAvailable);
return false;
}
@@ -642,25 +640,21 @@
double memoryAvailable = resourcesAvailable.getTotalMemoryMb();
if (memoryAdded > memoryAvailable) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}",
- td.getName(),
- exec,
- ws,
- memoryAdded,
- memoryAvailable);
- }
+ LOG.debug("Could not schedule {}:{} on {} not enough Mem {} > {}",
+ td.getName(),
+ exec,
+ ws,
+ memoryAdded,
+ memoryAvailable);
return false;
}
if (afterOnHeap > maxHeap) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Could not schedule {}:{} on {} HEAP would be too large {} > {}",
- td.getName(),
- exec,
- ws,
- afterOnHeap,
- maxHeap);
- }
+ LOG.debug("Could not schedule {}:{} on {} HEAP would be too large {} > {}",
+ td.getName(),
+ exec,
+ ws,
+ afterOnHeap,
+ maxHeap);
return false;
}
return true;
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 6928eb1..a70ae86 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -708,9 +708,27 @@
double memoryRequirement = entry.getValue().getOnHeapMemoryMb();
totalMemoryRequired += memoryRequirement * parallelism;
}
+
+ double ackerMem = getTotalAckerExecutorMemoryUsageForTopo(topology, topoConf);
+ totalMemoryRequired += ackerMem;
+
return totalMemoryRequired;
}
+ private static double getTotalAckerExecutorMemoryUsageForTopo(
+ StormTopology topology, Map<String, Object> topologyConf)
+ throws InvalidTopologyException {
+ topology = StormCommon.systemTopology(topologyConf, topology);
+ Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topologyConf);
+ NormalizedResourceRequest entry = boltResources.get(Acker.ACKER_COMPONENT_ID);
+ if (entry == null) {
+ return 0.0d;
+ }
+ Map<String, Integer> componentParallelism = getComponentParallelism(topologyConf, topology);
+ int parallelism = componentParallelism.getOrDefault(Acker.ACKER_COMPONENT_ID, 1);
+ return entry.getTotalMemoryMb() * parallelism;
+ }
+
public static Map<String, Integer> getComponentParallelism(Map<String, Object> topoConf, StormTopology topology)
throws InvalidTopologyException {
Map<String, Integer> ret = new HashMap<>();
@@ -1398,6 +1416,9 @@
topology = StormCommon.systemTopology(topologyConf, topology);
Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topologyConf);
NormalizedResourceRequest entry = boltResources.get(Acker.ACKER_COMPONENT_ID);
+ if (entry == null) {
+ return 0.0d;
+ }
return entry.getTotalMemoryMb();
}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
index bed28ce..abc5f25 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
@@ -20,10 +20,19 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.config.Configurator;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.DefaultScheduler;
+import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.INimbus;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignmentImpl;
@@ -31,11 +40,13 @@
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategyOld;
import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategyOld;
import org.apache.storm.scheduler.resource.strategies.scheduling.RoundRobinResourceAwareStrategy;
+import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -74,6 +85,118 @@
}
@Test
+ public void testBlacklistResumeWhenAckersWontFit() throws InvalidTopologyException {
+ // 3 supervisors exist with 4 slots, 2 are blacklisted
+ // topology with given worker heap size would fit in 4 slots if ignoring ackers, needs 5 slots with ackers.
+ // verify that one of the supervisors will be resumed and topology will schedule.
+
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 300);
+ config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 3);
+ config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 1800);
+ config.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, 100);
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10);
+ config.put(DaemonConfig.BLACKLIST_SCHEDULER_ASSUME_SUPERVISOR_BAD_BASED_ON_BAD_SLOT, false);
+ config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 3);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128);
+ config.put(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY, "org.apache.storm.scheduler.blacklist.strategies.RasBlacklistStrategy");
+ config.put(Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER, 1);
+ config.setNumWorkers(1);
+ config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 4);
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, "org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy");
+ config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 512);
+ config.put(Config.WORKER_HEAP_MEMORY_MB, 768);
+ config.put(Config.TOPOLOGY_NAME, "testTopology");
+
+ INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+ StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+ ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
+ Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4, 400.0d, 4096.0d);
+ Topologies noTopologies = new Topologies();
+ Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<>(), noTopologies, config);
+
+ scheduler = new BlacklistScheduler(new ResourceAwareScheduler());
+ scheduler.prepare(config, metricsRegistry);
+ scheduler.schedule(noTopologies, cluster);
+
+ Map<String, SupervisorDetails> removedSup0 = TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0");
+ Map<String, SupervisorDetails> removedSup0Sup1 = TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(removedSup0, "sup-1");
+
+ cluster = new Cluster(iNimbus, resourceMetrics, removedSup0Sup1,
+ TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), noTopologies, config);
+ scheduler.schedule(noTopologies, cluster);
+ scheduler.schedule(noTopologies, cluster);
+ scheduler.schedule(noTopologies, cluster);
+
+ // 2 supervisors blacklisted at this point. Let's schedule the topology.
+
+ Map<String, TopologyDetails> topoMap = new HashMap<>();
+ TopologyDetails topo1 = createResourceTopo(config);
+ topoMap.put(topo1.getId(), topo1);
+ Topologies topologies = new Topologies(topoMap);
+
+ cluster = new Cluster(iNimbus, resourceMetrics, supMap,
+ TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+ boolean enableTraceLogging = false; // for scheduling debug
+ if (enableTraceLogging) {
+ Configurator.setAllLevels(LogManager.getRootLogger().getName(), Level.TRACE);
+ }
+ scheduler.schedule(topologies, cluster);
+
+ // topology should be fully scheduled with 1 host remaining blacklisted
+ String topoScheduleStatus = cluster.getStatus("testTopology-id");
+ assertTrue(topoScheduleStatus.contains("Running - Fully Scheduled"));
+ assertEquals(1, cluster.getBlacklistedHosts().size());
+ }
+
+ public TopologyDetails createResourceTopo(Config conf) throws InvalidTopologyException {
+ int spoutParallelism = 6;
+ int boltParallelism = 5;
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("word", new TestUtilsForResourceAwareScheduler.TestSpout(),
+ spoutParallelism);
+ builder.setBolt("exclaim1", new TestUtilsForResourceAwareScheduler.TestBolt(),
+ boltParallelism).shuffleGrouping("word");
+ builder.setBolt("exclaim2", new TestUtilsForResourceAwareScheduler.TestBolt(),
+ boltParallelism).shuffleGrouping("exclaim1");
+
+ StormTopology stormTopology = builder.createTopology();
+
+ TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormTopology, 0,
+ genExecsAndComps(StormCommon.systemTopology(conf, stormTopology)), currentTime, "user");
+ return topo;
+ }
+
+ public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology) {
+ Map<ExecutorDetails, String> retMap = new HashMap<>();
+ int startTask = 1;
+ int endTask = 1;
+ for (Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+ SpoutSpec spout = entry.getValue();
+ String spoutId = entry.getKey();
+ int spoutParallelism = spout.get_common().get_parallelism_hint();
+ for (int i = 0; i < spoutParallelism; i++) {
+ retMap.put(new ExecutorDetails(startTask, endTask), spoutId);
+ startTask++;
+ endTask++;
+ }
+ }
+
+ for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
+ String boltId = entry.getKey();
+ Bolt bolt = entry.getValue();
+ int boltParallelism = bolt.get_common().get_parallelism_hint();
+ for (int i = 0; i < boltParallelism; i++) {
+ retMap.put(new ExecutorDetails(startTask, endTask), boltId);
+ startTask++;
+ endTask++;
+ }
+ }
+ return retMap;
+ }
+
+ @Test
public void TestBadSupervisor() {
INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
index 7d2f3fe..403cb99 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
@@ -18,6 +18,7 @@
package org.apache.storm.scheduler.blacklist;
import org.apache.storm.Config;
+import org.apache.storm.Constants;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
@@ -112,6 +113,22 @@
return retList;
}
+ public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts, double cpu, double memory) {
+ Map<String, Double> totalResources = new HashMap<>();
+ totalResources.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+ totalResources.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, memory);
+ Map<String, SupervisorDetails> retList = new HashMap<>();
+ for (int i = 0; i < numSup; i++) {
+ List<Number> ports = new LinkedList<>();
+ for (int j = 0; j < numPorts; j++) {
+ ports.add(j);
+ }
+ SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, totalResources);
+ retList.put(sup.getId(), sup);
+ }
+ return retList;
+ }
+
public static TopologyDetails getTopology(String name, Map<String, Object> config, int numSpout, int numBolt,
int spoutParallelism, int boltParallelism, int launchTime, boolean blacklistEnable) {