[STORM-3743] Test all clusters in a uniform manner.
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
index 3625110..b3d7584 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -68,12 +68,26 @@
 public class TestLargeCluster {
     private static final Logger LOG = LoggerFactory.getLogger(TestLargeCluster.class);
 
-    public static final String TEST_CLUSTER_01 = "largeCluster01";
-    public static final String TEST_CLUSTER_02 = "largeCluster02";
-    public static final String TEST_CLUSTER_03 = "largeCluster03";
+    public enum TEST_CLUSTER_NAME {
+        TEST_CLUSTER_01("largeCluster01"),
+        TEST_CLUSTER_02("largeCluster02"),
+        TEST_CLUSTER_03("largeCluster03");
 
-    public static final String TEST_CLUSTER_NAME = TEST_CLUSTER_02;
-    public static final String TEST_RESOURCE_PATH = "clusterconf/" + TEST_CLUSTER_NAME;
+        private final String clusterName;
+
+        TEST_CLUSTER_NAME(String clusterName) {
+            this.clusterName = clusterName;
+        }
+
+        String getClusterName() {
+            return clusterName;
+        }
+
+        String getResourcePath() {
+            return "clusterconf/" + clusterName;
+        }
+    }
+
     public static final String COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING = "code.ser";
     public static final String COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING = "conf.ser";
 
@@ -93,8 +107,8 @@
      * files are sequential. Unpaired files may be ignored by the caller.
      *
      * @param path directory in which resources exist.
-     * @return
-     * @throws IOException
+     * @return list of resource file names
+     * @throws IOException upon exception in reading resources.
      */
     public static List<String> getResourceFiles(String path) throws IOException {
         List<String> fileNames = new ArrayList<>();
@@ -119,8 +133,8 @@
     /**
      * InputStream to read the fully qualified resource path.
      *
-     * @param resource
-     * @return
+     * @param resource path to read.
+     * @return InputStream of the resource being read.
      */
     public static InputStream getResourceAsStream(String resource) {
         final InputStream in = getContextClassLoader().getResourceAsStream(resource);
@@ -130,9 +144,9 @@
     /**
      * Read the contents of the fully qualified resource path.
      *
-     * @param resource
-     * @return
-     * @throws Exception
+     * @param resource to read.
+     * @return byte array of the fully read resource.
+     * @throws Exception upon error in reading resource.
      */
     public static byte[] getResourceAsBytes(String resource) throws Exception {
         InputStream in = getResourceAsStream(resource);
@@ -157,21 +171,20 @@
      *
      * @param failOnParseError throw exception if there are unmatched files, otherwise ignore unmatched and read errors.
      * @return An array of TopologyDetails representing resource files.
-     * @throws Exception
+     * @throws Exception upon error in reading topology serialized files.
      */
-    public static TopologyDetails[] createTopoDetailsArray(boolean failOnParseError) throws Exception {
+    public static TopologyDetails[] createTopoDetailsArray(String resourcePath, boolean failOnParseError) throws Exception {
         List<TopologyDetails> topoDetailsList = new ArrayList<>();
         List<String> errors = new ArrayList<>();
-        List<String> resources = getResourceFiles(TEST_RESOURCE_PATH);
+        List<String> resources = getResourceFiles(resourcePath);
         Map<String, String> codeResourceMap = new TreeMap<>();
         Map<String, String> confResourceMap = new HashMap<>();
-        for (int i = 0 ; i < resources.size() ; i++) {
-            String resource = resources.get(i);
+        for (String resource : resources) {
             int idxOfSlash = resource.lastIndexOf("/");
             int idxOfDash = resource.lastIndexOf("-");
             String nm = idxOfDash > idxOfSlash
-                    ? resource.substring(idxOfSlash + 1, idxOfDash)
-                    : resource.substring(idxOfSlash + 1, resource.length() - COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length());
+                ? resource.substring(idxOfSlash + 1, idxOfDash)
+                : resource.substring(idxOfSlash + 1, resource.length() - COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length());
             if (resource.endsWith(COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING)) {
                 codeResourceMap.put(nm, resource);
             } else if (resource.endsWith(COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING)) {
@@ -192,15 +205,15 @@
                 Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB,
         };
 
-        for (String nm : codeResourceMap.keySet()) {
-            String codeResource = codeResourceMap.get(nm);
-            if (!confResourceMap.containsKey(nm)) {
-                String err = String.format("Ignoring topology file %s because of missing config file for %s", codeResource, nm);
+        for (String topoId : codeResourceMap.keySet()) {
+            String codeResource = codeResourceMap.get(topoId);
+            if (!confResourceMap.containsKey(topoId)) {
+                String err = String.format("Ignoring topology file %s because of missing config file for %s", codeResource, topoId);
                 errors.add(err);
                 LOG.error(err);
                 continue;
             }
-            String confResource = confResourceMap.get(nm);
+            String confResource = confResourceMap.get(topoId);
             LOG.info("Found matching topology and config files: {}, {}", codeResource, confResource);
             StormTopology stormTopology;
             try {
@@ -245,11 +258,10 @@
                 conf.put(Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER, 1);
             }
 
-            String topoId = nm;
-            String topoName = (String) conf.getOrDefault(Config.TOPOLOGY_NAME, nm);
+            String topoName = (String) conf.getOrDefault(Config.TOPOLOGY_NAME, topoId);
 
             // conf
-            StringBuffer sb = new StringBuffer("Config for " + nm + ": ");
+            StringBuilder sb = new StringBuilder("Config for " + topoId + ": ");
             for (String param : examinedConfParams) {
                 Object val = conf.getOrDefault(param, "<null>");
                 sb.append(param).append("=").append(val).append(", ");
@@ -285,9 +297,12 @@
      */
     @Test
     public void testReadSerializedTopologiesAndConfigs() throws Exception {
-        List<String> resources = getResourceFiles(TEST_RESOURCE_PATH);
-        Assert.assertTrue("No resource files found in " + TEST_RESOURCE_PATH, !resources.isEmpty());
-        TopologyDetails[] topoDetailsArray = createTopoDetailsArray(true);
+        for (TEST_CLUSTER_NAME testClusterName: TEST_CLUSTER_NAME.values()) {
+            String resourcePath = testClusterName.getResourcePath();
+            List<String> resources = getResourceFiles(resourcePath);
+            Assert.assertFalse("No resource files found in " + resourcePath, resources.isEmpty());
+            createTopoDetailsArray(resourcePath, true);
+        }
     }
 
     /**
@@ -321,35 +336,20 @@
     }
 
     /**
-     * Create supervisors for a larger cluster configuration.
-     *
-     * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
-     * @return created supervisors.
-     */
-    private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
-        if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
-            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
-            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
-        } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
-            Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
-            return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
-        } else {
-            return createSupervisorsForCluster01(reducedSupervisorsPerRack);
-        }
-    }
-
-    /**
      * Create supervisors based on a predefined supervisor distribution modeled after an existing
      * large cluster in use.
      *
-     * @param supervisorDistributions supervisor distribution to use.
+     * @param testClusterName cluster for which the supervisors are created.
      * @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
      * @return created supervisors.
      */
     private static Map<String, SupervisorDetails> createSupervisors(
-        Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+        TEST_CLUSTER_NAME testClusterName, int reducedSupervisorsPerRack) {
+
+        Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution(testClusterName);
         Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
-        LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+        LOG.info("Cluster={}, Designed capacity: {}",
+            testClusterName.getClusterName(), SupervisorDistribution.clusterCapacity(supervisorDistributions));
 
         Map<String, SupervisorDetails> retList = new HashMap<>();
         Map<String, AtomicInteger> seenRacks = new HashMap<>();
@@ -361,7 +361,7 @@
             final int adjustedRackSupervisorCnt = tmpRackSupervisorCnt;
             list.forEach(x -> {
                 int supervisorCnt = x.supervisorCnt;
-                for (int i = 0; i < supervisorCnt ; i++) {
+                for (int i = 0; i < supervisorCnt; i++) {
                     int superInRack = seenRacks.computeIfAbsent(rackId, z -> new AtomicInteger(-1)).incrementAndGet();
                     int rackNum = seenRacks.size() - 1;
                     if (superInRack >= adjustedRackSupervisorCnt) {
@@ -375,97 +375,72 @@
     }
 
     /**
-     * Create supervisors for a non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use.
-     *
-     * @param reducedSupervisorsPerRack is the reduction in supervisors per rack to constrain capacity (15 is tight)
-     * @return supervisor details indexed by id
-     */
-    private static Map<String, SupervisorDetails> createSupervisorsForCluster01(int reducedSupervisorsPerRack) {
-        int numSupersPerRack = 82 - Math.abs(reducedSupervisorsPerRack);
-        int numPorts = 50;
-
-        Map<String, SupervisorDetails> retList = new HashMap<>();
-
-        for (int rack = 0 ; rack < 12 ; rack++) {
-            double cpu = 3600; // %percent
-            double mem = 178_000; // MB
-            for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-            }
-        }
-        for (int rack = 12 ; rack < 14 ; rack++) {
-            double cpu = 2400; // %percent
-            double mem = 118_100; // MB
-            for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-            }
-        }
-        for (int rack = 14 ; rack < 16 ; rack++) {
-            double cpu = 1200; // %percent
-            double mem = 42_480; // MB
-            for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
-                createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
-            }
-        }
-        return retList;
-    }
-
-    /**
      * Create a large cluster, read topologies and configuration from resource directory and schedule.
      *
-     * @throws Exception
+     * @throws Exception upon error.
      */
     @Test
     public void testLargeCluster() throws Exception {
-        Map<String, SupervisorDetails> supervisors = createSupervisors(0);
+        for (TEST_CLUSTER_NAME testClusterName: TEST_CLUSTER_NAME.values()) {
+            LOG.info("********************************************");
+            LOG.info("testLargeCluster: Start Processing cluster {}", testClusterName.getClusterName());
 
-        TopologyDetails[] topoDetailsArray = createTopoDetailsArray(false);
-        Assert.assertTrue("No topologies found", topoDetailsArray.length > 0);
-        Topologies topologies = new Topologies(topoDetailsArray);
+            String resourcePath = testClusterName.getResourcePath();
+            Map<String, SupervisorDetails> supervisors = createSupervisors(testClusterName, 0);
 
-        Config confWithDefaultStrategy = new Config();
-        confWithDefaultStrategy.putAll(topoDetailsArray[0].getConf());
-        confWithDefaultStrategy.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
-        confWithDefaultStrategy.put(
+            TopologyDetails[] topoDetailsArray = createTopoDetailsArray(resourcePath, false);
+            Assert.assertTrue("No topologies found for cluster " + testClusterName.getClusterName(), topoDetailsArray.length > 0);
+            Topologies topologies = new Topologies(topoDetailsArray);
+
+            Config confWithDefaultStrategy = new Config();
+            confWithDefaultStrategy.putAll(topoDetailsArray[0].getConf());
+            confWithDefaultStrategy.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
+            confWithDefaultStrategy.put(
                 Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN,
                 TestUtilsForResourceAwareScheduler.GenSupervisorsDnsToSwitchMapping.class.getName());
 
-        INimbus iNimbus = new INimbusTest();
-        Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supervisors, new HashMap<>(),
+            INimbus iNimbus = new INimbusTest();
+            Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supervisors, new HashMap<>(),
                 topologies, confWithDefaultStrategy);
 
-        scheduler = new ResourceAwareScheduler();
+            scheduler = new ResourceAwareScheduler();
 
-        List<Class> classesToDebug = Arrays.asList(DefaultResourceAwareStrategy.class,
+            List<Class> classesToDebug = Arrays.asList(DefaultResourceAwareStrategy.class,
                 GenericResourceAwareStrategy.class, ResourceAwareScheduler.class,
                 Cluster.class
-        );
-        Level logLevel = Level.INFO ; // switch to Level.DEBUG for verbose otherwise Level.INFO
-        classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), logLevel));
-        long startTime = System.currentTimeMillis();
-        scheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry());
-        scheduler.schedule(topologies, cluster);
-        long endTime = System.currentTimeMillis();
-        LOG.info("Scheduling Time: {} topologies in {} seconds", topoDetailsArray.length, (endTime - startTime) / 1000.0);
+            );
+            Level logLevel = Level.INFO ; // switch to Level.DEBUG for verbose otherwise Level.INFO
+            classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), logLevel));
+            long startTime = System.currentTimeMillis();
+            scheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry());
+            scheduler.schedule(topologies, cluster);
+            long endTime = System.currentTimeMillis();
+            LOG.info("Cluster={} Scheduling Time: {} topologies in {} seconds",
+                testClusterName.getClusterName(), topoDetailsArray.length, (endTime - startTime) / 1000.0);
 
-        for (TopologyDetails td : topoDetailsArray) {
-            TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, td.getName());
-        }
+            for (TopologyDetails td : topoDetailsArray) {
+                TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, td.getName());
+            }
 
-        // Remove topology and reschedule it
-        for (int i = 0 ; i < topoDetailsArray.length ; i++) {
-            startTime = System.currentTimeMillis();
-            TopologyDetails topoDetails = topoDetailsArray[i];
-            cluster.unassign(topoDetails.getId());
-            LOG.info("({}) Removed topology {}", i, topoDetails.getName());
-            IScheduler rescheduler = new ResourceAwareScheduler();
-            rescheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry());
-            rescheduler.schedule(topologies, cluster);
-            TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, topoDetails.getName());
-            endTime = System.currentTimeMillis();
-            LOG.info("({}) Scheduling Time: Removed topology {} and rescheduled in {} seconds", i, topoDetails.getName(), (endTime - startTime) / 1000.0);
+            // Remove topology and reschedule it
+            for (int i = 0 ; i < topoDetailsArray.length ; i++) {
+                startTime = System.currentTimeMillis();
+                TopologyDetails topoDetails = topoDetailsArray[i];
+                cluster.unassign(topoDetails.getId());
+                LOG.info("Cluster={},  ({}) Removed topology {}", testClusterName.getClusterName(), i, topoDetails.getName());
+                IScheduler rescheduler = new ResourceAwareScheduler();
+                rescheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry());
+                rescheduler.schedule(topologies, cluster);
+                TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, topoDetails.getName());
+                endTime = System.currentTimeMillis();
+                LOG.info("Cluster={}, ({}) Scheduling Time: Removed topology {} and rescheduled in {} seconds",
+                    testClusterName.getClusterName(), i, topoDetails.getName(), (endTime - startTime) / 1000.0);
+            }
+            classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), Level.INFO));
+
+            LOG.info("testLargeCluster: End Processing cluster {}", testClusterName.getClusterName());
+            LOG.info("********************************************");
         }
-        classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), Level.INFO));
     }
 
     public static class SupervisorDistribution {
@@ -489,6 +464,53 @@
             return retVal;
         }
 
+        public static Collection<SupervisorDistribution> getSupervisorDistribution(TEST_CLUSTER_NAME testClusterName) {
+            switch (testClusterName) {
+                case TEST_CLUSTER_01:
+                    return getSupervisorDistribution01();
+                case TEST_CLUSTER_02:
+                    return getSupervisorDistribution02();
+                case TEST_CLUSTER_03:
+                default:
+                    return getSupervisorDistribution03();
+            }
+        }
+
+        private static Collection<SupervisorDistribution> getSupervisorDistribution01() {
+            int numSupersPerRack = 82;
+            int numPorts = 50;
+            int numSupersPerRackEven = numSupersPerRack / 2;
+            int numSupersPerRackOdd = numSupersPerRack - numSupersPerRackEven;
+
+            List<SupervisorDistribution> ret = new ArrayList<>();
+
+            for (int rack = 0; rack < 12; rack++) {
+                String rackId = String.format("r%03d", rack);
+                int cpu = 3600; // %percent
+                int mem = 178_000; // MB
+                int adjustedCpu = cpu - 100;
+                ret.add(new SupervisorDistribution(numSupersPerRackEven, rackId, numPorts, mem, cpu));
+                ret.add(new SupervisorDistribution(numSupersPerRackOdd, rackId, numPorts, mem, adjustedCpu));
+            }
+            for (int rack = 12; rack < 14; rack++) {
+                String rackId = String.format("r%03d", rack);
+                int cpu = 2400; // %percent
+                int mem = 118_100; // MB
+                int adjustedCpu = cpu - 100;
+                ret.add(new SupervisorDistribution(numSupersPerRackEven, rackId, numPorts, mem, cpu));
+                ret.add(new SupervisorDistribution(numSupersPerRackOdd, rackId, numPorts, mem, adjustedCpu));
+            }
+            for (int rack = 14; rack < 16; rack++) {
+                String rackId = String.format("r%03d", rack);
+                int cpu = 1200; // %percent
+                int mem = 42_480; // MB
+                int adjustedCpu = cpu - 100;
+                ret.add(new SupervisorDistribution(numSupersPerRackEven, rackId, numPorts, mem, cpu));
+                ret.add(new SupervisorDistribution(numSupersPerRackOdd, rackId, numPorts, mem, adjustedCpu));
+            }
+            return ret;
+        }
+
         public static Collection<SupervisorDistribution> getSupervisorDistribution02() {
             return Arrays.asList(
                 // Cnt, Rack,    Slot, Mem, CPU
@@ -534,11 +556,11 @@
             Set<String> racks = new HashSet<>();
 
             for (SupervisorDistribution x: supervisorDistributions) {
-                memoryMb += (x.supervisorCnt * x.memoryMb);
-                cpuPercent += (x.supervisorCnt * x.cpuPercent);
+                memoryMb += ((long) x.supervisorCnt * x.memoryMb);
+                cpuPercent += ((long) x.supervisorCnt * x.cpuPercent);
                 supervisorCnt += x.supervisorCnt;
                 racks.add(x.rackId);
-            };
+            }
             return String.format("Cluster summary: Racks=%d, Supervisors=%d, memoryMb=%d, cpuPercent=%d",
                 racks.size(), supervisorCnt, memoryMb, cpuPercent);
         }
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
index 1ac314f..0c2a171 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
@@ -66,8 +66,6 @@
 public class TestTopologyAnonymizerUtils {
     private static final Logger LOG = LoggerFactory.getLogger(TestTopologyAnonymizerUtils.class);
 
-    // iridiumblue -> largeCluster02 (prior largeCluster01)
-    // ebonyred -> largeCluster03
     private static final String DEFAULT_ORIGINAL_RESOURCES_PATH = "clusterconf/ebonyred";
     private static final String DEFAULT_ANONYMIZED_RESOURCES_OUTDIR = "src/test/resources/clusterconf/largeCluster03";
     public static final String COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING = "stormcode.ser";