[STORM-3743] Test all clusters in a uniform manner.
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
index 3625110..b3d7584 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestLargeCluster.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -68,12 +68,26 @@
public class TestLargeCluster {
private static final Logger LOG = LoggerFactory.getLogger(TestLargeCluster.class);
- public static final String TEST_CLUSTER_01 = "largeCluster01";
- public static final String TEST_CLUSTER_02 = "largeCluster02";
- public static final String TEST_CLUSTER_03 = "largeCluster03";
+ public enum TEST_CLUSTER_NAME {
+ TEST_CLUSTER_01("largeCluster01"),
+ TEST_CLUSTER_02("largeCluster02"),
+ TEST_CLUSTER_03("largeCluster03");
- public static final String TEST_CLUSTER_NAME = TEST_CLUSTER_02;
- public static final String TEST_RESOURCE_PATH = "clusterconf/" + TEST_CLUSTER_NAME;
+ private final String clusterName;
+
+ TEST_CLUSTER_NAME(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ String getClusterName() {
+ return clusterName;
+ }
+
+ String getResourcePath() {
+ return "clusterconf/" + clusterName;
+ }
+ }
+
public static final String COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING = "code.ser";
public static final String COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING = "conf.ser";
@@ -93,8 +107,8 @@
* files are sequential. Unpaired files may be ignored by the caller.
*
* @param path directory in which resources exist.
- * @return
- * @throws IOException
+ * @return list of resource file names
+ * @throws IOException upon exception in reading resources.
*/
public static List<String> getResourceFiles(String path) throws IOException {
List<String> fileNames = new ArrayList<>();
@@ -119,8 +133,8 @@
/**
* InputStream to read the fully qualified resource path.
*
- * @param resource
- * @return
+ * @param resource path to read.
+ * @return InputStream of the resource being read.
*/
public static InputStream getResourceAsStream(String resource) {
final InputStream in = getContextClassLoader().getResourceAsStream(resource);
@@ -130,9 +144,9 @@
/**
* Read the contents of the fully qualified resource path.
*
- * @param resource
- * @return
- * @throws Exception
+ * @param resource to read.
+ * @return byte array of the fully read resource.
+ * @throws Exception upon error in reading resource.
*/
public static byte[] getResourceAsBytes(String resource) throws Exception {
InputStream in = getResourceAsStream(resource);
@@ -157,21 +171,20 @@
*
* @param failOnParseError throw exception if there are unmatched files, otherwise ignore unmatched and read errors.
* @return An array of TopologyDetails representing resource files.
- * @throws Exception
+ * @throws Exception upon error in reading topology serialized files.
*/
- public static TopologyDetails[] createTopoDetailsArray(boolean failOnParseError) throws Exception {
+ public static TopologyDetails[] createTopoDetailsArray(String resourcePath, boolean failOnParseError) throws Exception {
List<TopologyDetails> topoDetailsList = new ArrayList<>();
List<String> errors = new ArrayList<>();
- List<String> resources = getResourceFiles(TEST_RESOURCE_PATH);
+ List<String> resources = getResourceFiles(resourcePath);
Map<String, String> codeResourceMap = new TreeMap<>();
Map<String, String> confResourceMap = new HashMap<>();
- for (int i = 0 ; i < resources.size() ; i++) {
- String resource = resources.get(i);
+ for (String resource : resources) {
int idxOfSlash = resource.lastIndexOf("/");
int idxOfDash = resource.lastIndexOf("-");
String nm = idxOfDash > idxOfSlash
- ? resource.substring(idxOfSlash + 1, idxOfDash)
- : resource.substring(idxOfSlash + 1, resource.length() - COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length());
+ ? resource.substring(idxOfSlash + 1, idxOfDash)
+ : resource.substring(idxOfSlash + 1, resource.length() - COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length());
if (resource.endsWith(COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING)) {
codeResourceMap.put(nm, resource);
} else if (resource.endsWith(COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING)) {
@@ -192,15 +205,15 @@
Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB,
};
- for (String nm : codeResourceMap.keySet()) {
- String codeResource = codeResourceMap.get(nm);
- if (!confResourceMap.containsKey(nm)) {
- String err = String.format("Ignoring topology file %s because of missing config file for %s", codeResource, nm);
+ for (String topoId : codeResourceMap.keySet()) {
+ String codeResource = codeResourceMap.get(topoId);
+ if (!confResourceMap.containsKey(topoId)) {
+ String err = String.format("Ignoring topology file %s because of missing config file for %s", codeResource, topoId);
errors.add(err);
LOG.error(err);
continue;
}
- String confResource = confResourceMap.get(nm);
+ String confResource = confResourceMap.get(topoId);
LOG.info("Found matching topology and config files: {}, {}", codeResource, confResource);
StormTopology stormTopology;
try {
@@ -245,11 +258,10 @@
conf.put(Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER, 1);
}
- String topoId = nm;
- String topoName = (String) conf.getOrDefault(Config.TOPOLOGY_NAME, nm);
+ String topoName = (String) conf.getOrDefault(Config.TOPOLOGY_NAME, topoId);
// conf
- StringBuffer sb = new StringBuffer("Config for " + nm + ": ");
+ StringBuilder sb = new StringBuilder("Config for " + topoId + ": ");
for (String param : examinedConfParams) {
Object val = conf.getOrDefault(param, "<null>");
sb.append(param).append("=").append(val).append(", ");
@@ -285,9 +297,12 @@
*/
@Test
public void testReadSerializedTopologiesAndConfigs() throws Exception {
- List<String> resources = getResourceFiles(TEST_RESOURCE_PATH);
- Assert.assertTrue("No resource files found in " + TEST_RESOURCE_PATH, !resources.isEmpty());
- TopologyDetails[] topoDetailsArray = createTopoDetailsArray(true);
+ for (TEST_CLUSTER_NAME testClusterName: TEST_CLUSTER_NAME.values()) {
+ String resourcePath = testClusterName.getResourcePath();
+ List<String> resources = getResourceFiles(resourcePath);
+ Assert.assertFalse("No resource files found in " + resourcePath, resources.isEmpty());
+ createTopoDetailsArray(resourcePath, true);
+ }
}
/**
@@ -321,35 +336,20 @@
}
/**
- * Create supervisors for a larger cluster configuration.
- *
- * @param reducedSupervisorsPerRack number of supervisors to reduce in rack.
- * @return created supervisors.
- */
- private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) {
- if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) {
- Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02();
- return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
- } else if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_03)) {
- Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution03();
- return createSupervisors(supervisorDistributions, reducedSupervisorsPerRack);
- } else {
- return createSupervisorsForCluster01(reducedSupervisorsPerRack);
- }
- }
-
- /**
* Create supervisors based on a predefined supervisor distribution modeled after an existing
* large cluster in use.
*
- * @param supervisorDistributions supervisor distribution to use.
+ * @param testClusterName cluster for which the supervisors are created.
* @param reducedSupervisorsPerRack number of supervisors to reduce per rack.
* @return created supervisors.
*/
private static Map<String, SupervisorDetails> createSupervisors(
- Collection<SupervisorDistribution> supervisorDistributions, int reducedSupervisorsPerRack) {
+ TEST_CLUSTER_NAME testClusterName, int reducedSupervisorsPerRack) {
+
+ Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution(testClusterName);
Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions);
- LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions));
+ LOG.info("Cluster={}, Designed capacity: {}",
+ testClusterName.getClusterName(), SupervisorDistribution.clusterCapacity(supervisorDistributions));
Map<String, SupervisorDetails> retList = new HashMap<>();
Map<String, AtomicInteger> seenRacks = new HashMap<>();
@@ -361,7 +361,7 @@
final int adjustedRackSupervisorCnt = tmpRackSupervisorCnt;
list.forEach(x -> {
int supervisorCnt = x.supervisorCnt;
- for (int i = 0; i < supervisorCnt ; i++) {
+ for (int i = 0; i < supervisorCnt; i++) {
int superInRack = seenRacks.computeIfAbsent(rackId, z -> new AtomicInteger(-1)).incrementAndGet();
int rackNum = seenRacks.size() - 1;
if (superInRack >= adjustedRackSupervisorCnt) {
@@ -375,97 +375,72 @@
}
/**
- * Create supervisors for a non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use.
- *
- * @param reducedSupervisorsPerRack is the reduction in supervisors per rack to constrain capacity (15 is tight)
- * @return supervisor details indexed by id
- */
- private static Map<String, SupervisorDetails> createSupervisorsForCluster01(int reducedSupervisorsPerRack) {
- int numSupersPerRack = 82 - Math.abs(reducedSupervisorsPerRack);
- int numPorts = 50;
-
- Map<String, SupervisorDetails> retList = new HashMap<>();
-
- for (int rack = 0 ; rack < 12 ; rack++) {
- double cpu = 3600; // %percent
- double mem = 178_000; // MB
- for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
- createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
- }
- }
- for (int rack = 12 ; rack < 14 ; rack++) {
- double cpu = 2400; // %percent
- double mem = 118_100; // MB
- for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
- createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
- }
- }
- for (int rack = 14 ; rack < 16 ; rack++) {
- double cpu = 1200; // %percent
- double mem = 42_480; // MB
- for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) {
- createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList);
- }
- }
- return retList;
- }
-
- /**
* Create a large cluster, read topologies and configuration from resource directory and schedule.
*
- * @throws Exception
+ * @throws Exception upon error.
*/
@Test
public void testLargeCluster() throws Exception {
- Map<String, SupervisorDetails> supervisors = createSupervisors(0);
+ for (TEST_CLUSTER_NAME testClusterName: TEST_CLUSTER_NAME.values()) {
+ LOG.info("********************************************");
+ LOG.info("testLargeCluster: Start Processing cluster {}", testClusterName.getClusterName());
- TopologyDetails[] topoDetailsArray = createTopoDetailsArray(false);
- Assert.assertTrue("No topologies found", topoDetailsArray.length > 0);
- Topologies topologies = new Topologies(topoDetailsArray);
+ String resourcePath = testClusterName.getResourcePath();
+ Map<String, SupervisorDetails> supervisors = createSupervisors(testClusterName, 0);
- Config confWithDefaultStrategy = new Config();
- confWithDefaultStrategy.putAll(topoDetailsArray[0].getConf());
- confWithDefaultStrategy.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
- confWithDefaultStrategy.put(
+ TopologyDetails[] topoDetailsArray = createTopoDetailsArray(resourcePath, false);
+ Assert.assertTrue("No topologies found for cluster " + testClusterName.getClusterName(), topoDetailsArray.length > 0);
+ Topologies topologies = new Topologies(topoDetailsArray);
+
+ Config confWithDefaultStrategy = new Config();
+ confWithDefaultStrategy.putAll(topoDetailsArray[0].getConf());
+ confWithDefaultStrategy.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
+ confWithDefaultStrategy.put(
Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN,
TestUtilsForResourceAwareScheduler.GenSupervisorsDnsToSwitchMapping.class.getName());
- INimbus iNimbus = new INimbusTest();
- Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supervisors, new HashMap<>(),
+ INimbus iNimbus = new INimbusTest();
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supervisors, new HashMap<>(),
topologies, confWithDefaultStrategy);
- scheduler = new ResourceAwareScheduler();
+ scheduler = new ResourceAwareScheduler();
- List<Class> classesToDebug = Arrays.asList(DefaultResourceAwareStrategy.class,
+ List<Class> classesToDebug = Arrays.asList(DefaultResourceAwareStrategy.class,
GenericResourceAwareStrategy.class, ResourceAwareScheduler.class,
Cluster.class
- );
- Level logLevel = Level.INFO ; // switch to Level.DEBUG for verbose otherwise Level.INFO
- classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), logLevel));
- long startTime = System.currentTimeMillis();
- scheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry());
- scheduler.schedule(topologies, cluster);
- long endTime = System.currentTimeMillis();
- LOG.info("Scheduling Time: {} topologies in {} seconds", topoDetailsArray.length, (endTime - startTime) / 1000.0);
+ );
+ Level logLevel = Level.INFO ; // switch to Level.DEBUG for verbose otherwise Level.INFO
+ classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), logLevel));
+ long startTime = System.currentTimeMillis();
+ scheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry());
+ scheduler.schedule(topologies, cluster);
+ long endTime = System.currentTimeMillis();
+ LOG.info("Cluster={} Scheduling Time: {} topologies in {} seconds",
+ testClusterName.getClusterName(), topoDetailsArray.length, (endTime - startTime) / 1000.0);
- for (TopologyDetails td : topoDetailsArray) {
- TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, td.getName());
- }
+ for (TopologyDetails td : topoDetailsArray) {
+ TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, td.getName());
+ }
- // Remove topology and reschedule it
- for (int i = 0 ; i < topoDetailsArray.length ; i++) {
- startTime = System.currentTimeMillis();
- TopologyDetails topoDetails = topoDetailsArray[i];
- cluster.unassign(topoDetails.getId());
- LOG.info("({}) Removed topology {}", i, topoDetails.getName());
- IScheduler rescheduler = new ResourceAwareScheduler();
- rescheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry());
- rescheduler.schedule(topologies, cluster);
- TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, topoDetails.getName());
- endTime = System.currentTimeMillis();
- LOG.info("({}) Scheduling Time: Removed topology {} and rescheduled in {} seconds", i, topoDetails.getName(), (endTime - startTime) / 1000.0);
+ // Remove topology and reschedule it
+ for (int i = 0 ; i < topoDetailsArray.length ; i++) {
+ startTime = System.currentTimeMillis();
+ TopologyDetails topoDetails = topoDetailsArray[i];
+ cluster.unassign(topoDetails.getId());
+ LOG.info("Cluster={}, ({}) Removed topology {}", testClusterName.getClusterName(), i, topoDetails.getName());
+ IScheduler rescheduler = new ResourceAwareScheduler();
+ rescheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry());
+ rescheduler.schedule(topologies, cluster);
+ TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, topoDetails.getName());
+ endTime = System.currentTimeMillis();
+ LOG.info("Cluster={}, ({}) Scheduling Time: Removed topology {} and rescheduled in {} seconds",
+ testClusterName.getClusterName(), i, topoDetails.getName(), (endTime - startTime) / 1000.0);
+ }
+ classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), Level.INFO));
+
+ LOG.info("testLargeCluster: End Processing cluster {}", testClusterName.getClusterName());
+ LOG.info("********************************************");
}
- classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), Level.INFO));
}
public static class SupervisorDistribution {
@@ -489,6 +464,53 @@
return retVal;
}
+ public static Collection<SupervisorDistribution> getSupervisorDistribution(TEST_CLUSTER_NAME testClusterName) {
+ switch (testClusterName) {
+ case TEST_CLUSTER_01:
+ return getSupervisorDistribution01();
+ case TEST_CLUSTER_02:
+ return getSupervisorDistribution02();
+ case TEST_CLUSTER_03:
+ default:
+ return getSupervisorDistribution03();
+ }
+ }
+
+ private static Collection<SupervisorDistribution> getSupervisorDistribution01() {
+ int numSupersPerRack = 82;
+ int numPorts = 50;
+ int numSupersPerRackEven = numSupersPerRack / 2;
+ int numSupersPerRackOdd = numSupersPerRack - numSupersPerRackEven;
+
+ List<SupervisorDistribution> ret = new ArrayList<>();
+
+ for (int rack = 0; rack < 12; rack++) {
+ String rackId = String.format("r%03d", rack);
+ int cpu = 3600; // %percent
+ int mem = 178_000; // MB
+ int adjustedCpu = cpu - 100;
+ ret.add(new SupervisorDistribution(numSupersPerRackEven, rackId, numPorts, mem, cpu));
+ ret.add(new SupervisorDistribution(numSupersPerRackOdd, rackId, numPorts, mem, adjustedCpu));
+ }
+ for (int rack = 12; rack < 14; rack++) {
+ String rackId = String.format("r%03d", rack);
+ int cpu = 2400; // %percent
+ int mem = 118_100; // MB
+ int adjustedCpu = cpu - 100;
+ ret.add(new SupervisorDistribution(numSupersPerRackEven, rackId, numPorts, mem, cpu));
+ ret.add(new SupervisorDistribution(numSupersPerRackOdd, rackId, numPorts, mem, adjustedCpu));
+ }
+ for (int rack = 14; rack < 16; rack++) {
+ String rackId = String.format("r%03d", rack);
+ int cpu = 1200; // %percent
+ int mem = 42_480; // MB
+ int adjustedCpu = cpu - 100;
+ ret.add(new SupervisorDistribution(numSupersPerRackEven, rackId, numPorts, mem, cpu));
+ ret.add(new SupervisorDistribution(numSupersPerRackOdd, rackId, numPorts, mem, adjustedCpu));
+ }
+ return ret;
+ }
+
public static Collection<SupervisorDistribution> getSupervisorDistribution02() {
return Arrays.asList(
// Cnt, Rack, Slot, Mem, CPU
@@ -534,11 +556,11 @@
Set<String> racks = new HashSet<>();
for (SupervisorDistribution x: supervisorDistributions) {
- memoryMb += (x.supervisorCnt * x.memoryMb);
- cpuPercent += (x.supervisorCnt * x.cpuPercent);
+ memoryMb += ((long) x.supervisorCnt * x.memoryMb);
+ cpuPercent += ((long) x.supervisorCnt * x.cpuPercent);
supervisorCnt += x.supervisorCnt;
racks.add(x.rackId);
- };
+ }
return String.format("Cluster summary: Racks=%d, Supervisors=%d, memoryMb=%d, cpuPercent=%d",
racks.size(), supervisorCnt, memoryMb, cpuPercent);
}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
index 1ac314f..0c2a171 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestTopologyAnonymizerUtils.java
@@ -66,8 +66,6 @@
public class TestTopologyAnonymizerUtils {
private static final Logger LOG = LoggerFactory.getLogger(TestTopologyAnonymizerUtils.class);
- // iridiumblue -> largeCluster02 (prior largeCluster01)
- // ebonyred -> largeCluster03
private static final String DEFAULT_ORIGINAL_RESOURCES_PATH = "clusterconf/ebonyred";
private static final String DEFAULT_ANONYMIZED_RESOURCES_OUTDIR = "src/test/resources/clusterconf/largeCluster03";
public static final String COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING = "stormcode.ser";