STORM-4051 - Scheduler needs to include acker memory for topology resources (#3633)

* Scheduler needs to include acker memory for topology resources

* address review comments

---------

Co-authored-by: Aaron Gresch <agresch@yahooinc.com>
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
index 953e52a..89d6849 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
@@ -627,14 +627,12 @@
         double cpuAvailable = resourcesAvailable.getTotalCpu();
 
         if (cpuAdded > cpuAvailable) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Could not schedule {}:{} on {} not enough CPU {} > {}",
-                        td.getName(),
-                        exec,
-                        ws,
-                        cpuAdded,
-                        cpuAvailable);
-            }
+            LOG.debug("Could not schedule {}:{} on {} not enough CPU {} > {}",
+                    td.getName(),
+                    exec,
+                    ws,
+                    cpuAdded,
+                    cpuAvailable);
             return false;
         }
 
@@ -642,25 +640,21 @@
         double memoryAvailable = resourcesAvailable.getTotalMemoryMb();
 
         if (memoryAdded > memoryAvailable) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Could not schedule {}:{} on {} not enough Mem {} > {}",
-                          td.getName(),
-                          exec,
-                          ws,
-                          memoryAdded,
-                          memoryAvailable);
-            }
+            LOG.debug("Could not schedule {}:{} on {} not enough Mem {} > {}",
+                      td.getName(),
+                      exec,
+                      ws,
+                      memoryAdded,
+                      memoryAvailable);
             return false;
         }
         if (afterOnHeap > maxHeap) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Could not schedule {}:{} on {} HEAP would be too large {} > {}",
-                          td.getName(),
-                          exec,
-                          ws,
-                          afterOnHeap,
-                          maxHeap);
-            }
+            LOG.debug("Could not schedule {}:{} on {} HEAP would be too large {} > {}",
+                      td.getName(),
+                      exec,
+                      ws,
+                      afterOnHeap,
+                      maxHeap);
             return false;
         }
         return true;
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 6928eb1..a70ae86 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -708,9 +708,27 @@
             double memoryRequirement = entry.getValue().getOnHeapMemoryMb();
             totalMemoryRequired += memoryRequirement * parallelism;
         }
+
+        double ackerMem = getTotalAckerExecutorMemoryUsageForTopo(topology, topoConf);
+        totalMemoryRequired += ackerMem;
+
         return totalMemoryRequired;
     }
 
+    private static double getTotalAckerExecutorMemoryUsageForTopo(
+            StormTopology topology, Map<String, Object> topologyConf)
+            throws InvalidTopologyException {
+        topology = StormCommon.systemTopology(topologyConf, topology);
+        Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topologyConf);
+        NormalizedResourceRequest entry = boltResources.get(Acker.ACKER_COMPONENT_ID);
+        if (entry == null) {
+            return 0.0d;
+        }
+        Map<String, Integer> componentParallelism = getComponentParallelism(topologyConf, topology);
+        int parallelism = componentParallelism.getOrDefault(Acker.ACKER_COMPONENT_ID, 1);
+        return entry.getTotalMemoryMb() * parallelism;
+    }
+
     public static Map<String, Integer> getComponentParallelism(Map<String, Object> topoConf, StormTopology topology)
         throws InvalidTopologyException {
         Map<String, Integer> ret = new HashMap<>();
@@ -1398,6 +1416,9 @@
         topology = StormCommon.systemTopology(topologyConf, topology);
         Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topologyConf);
         NormalizedResourceRequest entry = boltResources.get(Acker.ACKER_COMPONENT_ID);
+        if (entry == null) {
+            return 0.0d;
+        }
         return entry.getTotalMemoryMb();
     }
 
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
index bed28ce..abc5f25 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestBlacklistScheduler.java
@@ -20,10 +20,19 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.config.Configurator;
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.DefaultScheduler;
+import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.INimbus;
 import org.apache.storm.scheduler.IScheduler;
 import org.apache.storm.scheduler.SchedulerAssignmentImpl;
@@ -31,11 +40,13 @@
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
+import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
 import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategyOld;
 import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategyOld;
 import org.apache.storm.scheduler.resource.strategies.scheduling.RoundRobinResourceAwareStrategy;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -74,6 +85,118 @@
     }
 
     @Test
+    public void testBlacklistResumeWhenAckersWontFit() throws InvalidTopologyException {
+        // 3 supervisors exist with 4 slots, 2 are blacklisted
+        // topology with given worker heap size would fit in 4 slots if ignoring ackers, needs 5 slots with ackers.
+        // verify that one of the supervisors will be resumed and topology will schedule.
+
+        Config config = new Config();
+        config.putAll(Utils.readDefaultConfig());
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME, 300);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 3);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME, 1800);
+        config.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, 100);
+        config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_ASSUME_SUPERVISOR_BAD_BASED_ON_BAD_SLOT, false);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT, 3);
+        config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128);
+        config.put(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY, "org.apache.storm.scheduler.blacklist.strategies.RasBlacklistStrategy");
+        config.put(Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER, 1);
+        config.setNumWorkers(1);
+        config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 4);
+        config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, "org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy");
+        config.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 512);
+        config.put(Config.WORKER_HEAP_MEMORY_MB, 768);
+        config.put(Config.TOPOLOGY_NAME, "testTopology");
+
+        INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
+        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+        ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry);
+        Map<String, SupervisorDetails> supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4, 400.0d, 4096.0d);
+        Topologies noTopologies = new Topologies();
+        Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<>(), noTopologies, config);
+
+        scheduler = new BlacklistScheduler(new ResourceAwareScheduler());
+        scheduler.prepare(config, metricsRegistry);
+        scheduler.schedule(noTopologies, cluster);
+
+        Map<String, SupervisorDetails> removedSup0 = TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(supMap, "sup-0");
+        Map<String, SupervisorDetails> removedSup0Sup1 = TestUtilsForBlacklistScheduler.removeSupervisorFromSupervisors(removedSup0, "sup-1");
+
+        cluster = new Cluster(iNimbus, resourceMetrics, removedSup0Sup1,
+                TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), noTopologies, config);
+        scheduler.schedule(noTopologies, cluster);
+        scheduler.schedule(noTopologies, cluster);
+        scheduler.schedule(noTopologies, cluster);
+
+        // 2 supervisors blacklisted at this point.  Let's schedule the topology.
+
+        Map<String, TopologyDetails> topoMap = new HashMap<>();
+        TopologyDetails topo1 = createResourceTopo(config);
+        topoMap.put(topo1.getId(), topo1);
+        Topologies topologies = new Topologies(topoMap);
+
+        cluster = new Cluster(iNimbus, resourceMetrics, supMap,
+                TestUtilsForBlacklistScheduler.assignmentMapToImpl(cluster.getAssignments()), topologies, config);
+        boolean enableTraceLogging = false; // for scheduling debug
+        if (enableTraceLogging) {
+            Configurator.setAllLevels(LogManager.getRootLogger().getName(), Level.TRACE);
+        }
+        scheduler.schedule(topologies, cluster);
+
+        // topology should be fully scheduled with 1 host remaining blacklisted
+        String topoScheduleStatus = cluster.getStatus("testTopology-id");
+        assertTrue(topoScheduleStatus.contains("Running - Fully Scheduled"));
+        assertEquals(1, cluster.getBlacklistedHosts().size());
+    }
+
+    public TopologyDetails createResourceTopo(Config conf) throws InvalidTopologyException {
+        int spoutParallelism = 6;
+        int boltParallelism = 5;
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("word", new TestUtilsForResourceAwareScheduler.TestSpout(),
+                spoutParallelism);
+        builder.setBolt("exclaim1", new TestUtilsForResourceAwareScheduler.TestBolt(),
+                boltParallelism).shuffleGrouping("word");
+        builder.setBolt("exclaim2", new TestUtilsForResourceAwareScheduler.TestBolt(),
+                boltParallelism).shuffleGrouping("exclaim1");
+
+        StormTopology stormTopology = builder.createTopology();
+
+        TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormTopology, 0,
+                genExecsAndComps(StormCommon.systemTopology(conf, stormTopology)), currentTime, "user");
+        return topo;
+    }
+
+    public static Map<ExecutorDetails, String> genExecsAndComps(StormTopology topology) {
+        Map<ExecutorDetails, String> retMap = new HashMap<>();
+        int startTask = 1;
+        int endTask = 1;
+        for (Map.Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+            SpoutSpec spout = entry.getValue();
+            String spoutId = entry.getKey();
+            int spoutParallelism = spout.get_common().get_parallelism_hint();
+            for (int i = 0; i < spoutParallelism; i++) {
+                retMap.put(new ExecutorDetails(startTask, endTask), spoutId);
+                startTask++;
+                endTask++;
+            }
+        }
+
+        for (Map.Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
+            String boltId = entry.getKey();
+            Bolt bolt = entry.getValue();
+            int boltParallelism = bolt.get_common().get_parallelism_hint();
+            for (int i = 0; i < boltParallelism; i++) {
+                retMap.put(new ExecutorDetails(startTask, endTask), boltId);
+                startTask++;
+                endTask++;
+            }
+        }
+        return retMap;
+    }
+
+    @Test
     public void TestBadSupervisor() {
         INimbus iNimbus = new TestUtilsForBlacklistScheduler.INimbusTest();
 
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
index 7d2f3fe..403cb99 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/blacklist/TestUtilsForBlacklistScheduler.java
@@ -18,6 +18,7 @@
 package org.apache.storm.scheduler.blacklist;
 
 import org.apache.storm.Config;
+import org.apache.storm.Constants;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
@@ -112,6 +113,22 @@
         return retList;
     }
 
+    public static Map<String, SupervisorDetails> genSupervisors(int numSup, int numPorts, double cpu, double memory) {
+        Map<String, Double> totalResources = new HashMap<>();
+        totalResources.put(Constants.COMMON_CPU_RESOURCE_NAME, cpu);
+        totalResources.put(Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, memory);
+        Map<String, SupervisorDetails> retList = new HashMap<>();
+        for (int i = 0; i < numSup; i++) {
+            List<Number> ports = new LinkedList<>();
+            for (int j = 0; j < numPorts; j++) {
+                ports.add(j);
+            }
+            SupervisorDetails sup = new SupervisorDetails("sup-" + i, "host-" + i, null, ports, totalResources);
+            retList.put(sup.getId(), sup);
+        }
+        return retList;
+    }
+
 
     public static TopologyDetails getTopology(String name, Map<String, Object> config, int numSpout, int numBolt,
                                               int spoutParallelism, int boltParallelism, int launchTime, boolean blacklistEnable) {