| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.storm.scheduler.resource.strategies.scheduling; |
| |
| import org.apache.logging.log4j.Level; |
| import org.apache.logging.log4j.core.config.Configurator; |
| import org.apache.storm.Config; |
| import org.apache.storm.DaemonConfig; |
| import org.apache.storm.generated.StormTopology; |
| import org.apache.storm.metric.StormMetricsRegistry; |
| import org.apache.storm.scheduler.Cluster; |
| import org.apache.storm.scheduler.ExecutorDetails; |
| import org.apache.storm.scheduler.INimbus; |
| import org.apache.storm.scheduler.IScheduler; |
| import org.apache.storm.scheduler.SupervisorDetails; |
| import org.apache.storm.scheduler.Topologies; |
| import org.apache.storm.scheduler.TopologyDetails; |
| import org.apache.storm.scheduler.WorkerSlot; |
| import org.apache.storm.scheduler.resource.ResourceAwareScheduler; |
| import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler; |
| import org.apache.storm.scheduler.resource.normalization.NormalizedResources; |
| import org.apache.storm.scheduler.resource.normalization.NormalizedResourcesExtension; |
| import org.apache.storm.scheduler.resource.normalization.ResourceMetrics; |
| import org.apache.storm.utils.Time; |
| import org.apache.storm.utils.Utils; |
| import org.junit.Assert; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.extension.ExtendWith; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.BufferedReader; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| @ExtendWith({NormalizedResourcesExtension.class}) |
| public class TestLargeCluster { |
| private static final Logger LOG = LoggerFactory.getLogger(TestLargeCluster.class); |
| |
| public static final String TEST_CLUSTER_01 = "largeCluster01"; |
| public static final String TEST_CLUSTER_02 = "largeCluster02"; |
| |
| public static final String TEST_CLUSTER_NAME = TEST_CLUSTER_02; |
| public static final String TEST_RESOURCE_PATH = "clusterconf/" + TEST_CLUSTER_NAME; |
| public static final String COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING = "code.ser"; |
| public static final String COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING = "conf.ser"; |
| |
| private static IScheduler scheduler = null; |
| |
| @AfterEach |
| public void cleanup() { |
| if (scheduler != null) { |
| scheduler.cleanup(); |
| scheduler = null; |
| } |
| } |
| |
| /** |
| * Get the list of serialized topology (*code.ser) and configuration (*conf.ser) |
| * resource files in the path. The resources are sorted so that paired topology and conf |
| * files are sequential. Unpaired files may be ignored by the caller. |
| * |
| * @param path directory in which resources exist. |
| * @return |
| * @throws IOException |
| */ |
| public static List<String> getResourceFiles(String path) throws IOException { |
| List<String> fileNames = new ArrayList<>(); |
| |
| try ( |
| InputStream in = getResourceAsStream(path); |
| BufferedReader br = new BufferedReader(new InputStreamReader(in)) |
| ) { |
| String resource; |
| |
| while ((resource = br.readLine()) != null) { |
| if (resource.endsWith(COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING) |
| || resource.endsWith(COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING)) { |
| fileNames.add(path + "/" + resource); |
| } |
| } |
| Collections.sort(fileNames); |
| } |
| return fileNames; |
| } |
| |
| /** |
| * InputStream to read the fully qualified resource path. |
| * |
| * @param resource |
| * @return |
| */ |
| public static InputStream getResourceAsStream(String resource) { |
| final InputStream in = getContextClassLoader().getResourceAsStream(resource); |
| return in == null ? ClassLoader.getSystemClassLoader().getResourceAsStream(resource) : in; |
| } |
| |
| /** |
| * Read the contents of the fully qualified resource path. |
| * |
| * @param resource |
| * @return |
| * @throws Exception |
| */ |
| public static byte[] getResourceAsBytes(String resource) throws Exception { |
| InputStream in = getResourceAsStream(resource); |
| if (in == null) { |
| return null; |
| } |
| try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { |
| while (in.available() > 0) { |
| out.write(in.read()); |
| } |
| return out.toByteArray(); |
| } |
| } |
| |
| public static ClassLoader getContextClassLoader() { |
| return Thread.currentThread().getContextClassLoader(); |
| } |
| |
| /** |
| * Create an array of TopologyDetails by reading serialized files for topology and configuration in the |
| * resource path. Skip topologies with no executors/components. |
| * |
| * @param failOnParseError throw exception if there are unmatched files, otherwise ignore unmatched and read errors. |
| * @return An array of TopologyDetails representing resource files. |
| * @throws Exception |
| */ |
| public static TopologyDetails[] createTopoDetailsArray(boolean failOnParseError) throws Exception { |
| List<TopologyDetails> topoDetailsList = new ArrayList<>(); |
| List<String> errors = new ArrayList<>(); |
| List<String> resources = getResourceFiles(TEST_RESOURCE_PATH); |
| Map<String, String> codeResourceMap = new TreeMap<>(); |
| Map<String, String> confResourceMap = new HashMap<>(); |
| for (int i = 0 ; i < resources.size() ; i++) { |
| String resource = resources.get(i); |
| int idxOfSlash = resource.lastIndexOf("/"); |
| int idxOfDash = resource.lastIndexOf("-"); |
| String nm = idxOfDash > idxOfSlash |
| ? resource.substring(idxOfSlash + 1, idxOfDash) |
| : resource.substring(idxOfSlash + 1, resource.length() - COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING.length()); |
| if (resource.endsWith(COMPRESSED_SERIALIZED_TOPOLOGY_FILENAME_ENDING)) { |
| codeResourceMap.put(nm, resource); |
| } else if (resource.endsWith(COMPRESSED_SERIALIZED_CONFIG_FILENAME_ENDING)) { |
| confResourceMap.put(nm, resource); |
| } else { |
| LOG.info("Ignoring unsupported resource file " + resource); |
| } |
| } |
| String[] examinedConfParams = { |
| Config.TOPOLOGY_NAME, |
| Config.TOPOLOGY_SCHEDULER_STRATEGY, |
| Config.TOPOLOGY_PRIORITY, |
| Config.TOPOLOGY_WORKERS, |
| Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, |
| Config.TOPOLOGY_SUBMITTER_USER, |
| Config.TOPOLOGY_ACKER_CPU_PCORE_PERCENT, |
| Config.TOPOLOGY_ACKER_RESOURCES_OFFHEAP_MEMORY_MB, |
| Config.TOPOLOGY_ACKER_RESOURCES_ONHEAP_MEMORY_MB, |
| }; |
| |
| for (String nm : codeResourceMap.keySet()) { |
| String codeResource = codeResourceMap.get(nm); |
| if (!confResourceMap.containsKey(nm)) { |
| String err = String.format("Ignoring topology file %s because of missing config file for %s", codeResource, nm); |
| errors.add(err); |
| LOG.error(err); |
| continue; |
| } |
| String confResource = confResourceMap.get(nm); |
| LOG.info("Found matching topology and config files: {}, {}", codeResource, confResource); |
| StormTopology stormTopology; |
| try { |
| stormTopology = Utils.deserialize(getResourceAsBytes(codeResource), StormTopology.class); |
| } catch (Exception ex) { |
| String err = String.format("Cannot read topology from resource %s", codeResource); |
| errors.add(err); |
| LOG.error(err, ex); |
| continue; |
| } |
| |
| Map<String, Object> conf; |
| try { |
| conf = Utils.fromCompressedJsonConf(getResourceAsBytes(confResource)); |
| } catch (RuntimeException | IOException ex) { |
| String err = String.format("Cannot read configuration from resource %s", confResource); |
| errors.add(err); |
| LOG.error(err, ex); |
| continue; |
| } |
| // fix 0.10 conf class names |
| String[] configParamsToFix = {Config.TOPOLOGY_SCHEDULER_STRATEGY, Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, |
| DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY }; |
| for (String configParam: configParamsToFix) { |
| if (!conf.containsKey(configParam)) { |
| continue; |
| } |
| String className = (String) conf.get(configParam); |
| if (className.startsWith("backtype")) { |
| className = className.replace("backtype", "org.apache"); |
| conf.put(configParam, className); |
| } |
| } |
| // fix conf params used by ConstraintSolverStrategy |
| if (!conf.containsKey(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH)) { |
| conf.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, 10_000); |
| } |
| if (!conf.containsKey(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH)) { |
| conf.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, 10_000); |
| } |
| if (!conf.containsKey(Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER)) { |
| conf.put(Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER, 1); |
| } |
| |
| String topoId = nm; |
| String topoName = (String) conf.getOrDefault(Config.TOPOLOGY_NAME, nm); |
| |
| // conf |
| StringBuffer sb = new StringBuffer("Config for " + nm + ": "); |
| for (String param : examinedConfParams) { |
| Object val = conf.getOrDefault(param, "<null>"); |
| sb.append(param).append("=").append(val).append(", "); |
| } |
| LOG.info(sb.toString()); |
| |
| // topo |
| Map<ExecutorDetails, String> execToComp = TestUtilsForResourceAwareScheduler.genExecsAndComps(stormTopology); |
| LOG.info("Topology \"{}\" spouts={}, bolts={}, execToComp size is {}", topoName, |
| stormTopology.get_spouts_size(), stormTopology.get_bolts_size(), execToComp.size()); |
| if (execToComp.isEmpty()) { |
| LOG.error("Topology \"{}\" Ignoring BAD topology with zero executors", topoName); |
| continue; |
| } |
| int numWorkers = Integer.parseInt("" + conf.getOrDefault(Config.TOPOLOGY_WORKERS, "0")); |
| TopologyDetails topo = new TopologyDetails(topoId, conf, stormTopology, numWorkers, |
| execToComp, Time.currentTimeSecs(), "user"); |
| topo.getUserTopolgyComponents(); // sanity check - normally this should not fail |
| |
| topoDetailsList.add(topo); |
| } |
| if (!errors.isEmpty() && failOnParseError) { |
| throw new Exception("Unable to parse all serialized objects\n\t" + String.join("\n\t", errors)); |
| } |
| return topoDetailsList.toArray(new TopologyDetails[0]); |
| } |
| |
| /** |
| * Check if the files in the resource directory are matched, can be read properly, and code/config files occur |
| * in matched pairs. |
| * |
| * @throws Exception showing bad and unmatched resource files. |
| */ |
| @Test |
| public void testReadSerializedTopologiesAndConfigs() throws Exception { |
| List<String> resources = getResourceFiles(TEST_RESOURCE_PATH); |
| Assert.assertTrue("No resource files found in " + TEST_RESOURCE_PATH, !resources.isEmpty()); |
| TopologyDetails[] topoDetailsArray = createTopoDetailsArray(true); |
| } |
| |
| /** |
| * Create one supervisor and add to the supervisors list. |
| * |
| * @param rack rack-number |
| * @param superInRack supervisor number in the rack |
| * @param cpu percentage |
| * @param mem in megabytes |
| * @param numPorts number of ports on this supervisor |
| * @param sups returned map os supervisors |
| */ |
| private static void createAndAddOneSupervisor( |
| int rack, int superInRack, double cpu, double mem, int numPorts, |
| Map<String, SupervisorDetails> sups) { |
| |
| List<Number> ports = new LinkedList<>(); |
| for (int p = 0; p < numPorts; p++) { |
| ports.add(p); |
| } |
| String superId = String.format("r%03ds%03d", rack, superInRack); |
| String hostId = String.format("host-%03d-rack-%03d", superInRack, rack); |
| Map<String, Double> resourceMap = new HashMap<>(); |
| resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, cpu); |
| resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem); |
| resourceMap.put("network.resource.units", 50.0); |
| SupervisorDetails sup = new SupervisorDetails(superId, |
| hostId, null, ports, |
| NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(resourceMap)); |
| sups.put(sup.getId(), sup); |
| } |
| |
| /** |
| * Create supervisors for a larger cluster configuration. |
| * |
| * @param reducedSupervisorsPerRack number of supervisors to reduce in rack. |
| * @return created supervisors. |
| */ |
| private static Map<String, SupervisorDetails> createSupervisors(int reducedSupervisorsPerRack) { |
| if (TEST_CLUSTER_NAME.equals(TEST_CLUSTER_02)) { |
| return createSupervisorsForCluster02(reducedSupervisorsPerRack); |
| } else { |
| return createSupervisorsForCluster01(reducedSupervisorsPerRack); |
| } |
| } |
| |
| /** |
| * Create supervisors for a newer {@link #TEST_CLUSTER_02} cluster configuration to mimic a large cluster in use. |
| * |
| * @param reducedSupervisorsPerRack number of supervisors to reduce per rack. |
| * @return created supervisors. |
| */ |
| private static Map<String, SupervisorDetails> createSupervisorsForCluster02(int reducedSupervisorsPerRack) { |
| Collection<SupervisorDistribution> supervisorDistributions = SupervisorDistribution.getSupervisorDistribution02(); |
| Map<String, Collection<SupervisorDistribution>> byRackId = SupervisorDistribution.mapByRackId(supervisorDistributions); |
| LOG.info("Cluster={}, Designed capacity: {}", TEST_CLUSTER_NAME, SupervisorDistribution.clusterCapacity(supervisorDistributions)); |
| |
| Map<String, SupervisorDetails> retList = new HashMap<>(); |
| Map<String, AtomicInteger> seenRacks = new HashMap<>(); |
| byRackId.forEach((rackId, list) -> { |
| int tmpRackSupervisorCnt = list.stream().mapToInt(x -> x.supervisorCnt).sum() - Math.abs(reducedSupervisorsPerRack); |
| if (tmpRackSupervisorCnt > Math.abs(reducedSupervisorsPerRack)) { |
| tmpRackSupervisorCnt -= Math.abs(reducedSupervisorsPerRack); |
| } |
| final int adjustedRackSupervisorCnt = tmpRackSupervisorCnt; |
| list.forEach(x -> { |
| int supervisorCnt = x.supervisorCnt; |
| for (int i = 0; i < supervisorCnt ; i++) { |
| int superInRack = seenRacks.computeIfAbsent(rackId, z -> new AtomicInteger(-1)).incrementAndGet(); |
| int rackNum = seenRacks.size() - 1; |
| if (superInRack >= adjustedRackSupervisorCnt) { |
| continue; |
| } |
| createAndAddOneSupervisor(rackNum, superInRack, x.cpuPercent, x.memoryMb, x.slotCnt, retList); |
| } |
| }); |
| }); |
| return retList; |
| } |
| |
| /** |
| * Create supervisors for a non-uniform supervisor distribution closely (but not exactly) mimics a large cluster in use. |
| * |
| * @param reducedSupervisorsPerRack is the reduction in supervisors per rack to constrain capacity (15 is tight) |
| * @return supervisor details indexed by id |
| */ |
| private static Map<String, SupervisorDetails> createSupervisorsForCluster01(int reducedSupervisorsPerRack) { |
| int numSupersPerRack = 82 - Math.abs(reducedSupervisorsPerRack); |
| int numPorts = 50; |
| |
| Map<String, SupervisorDetails> retList = new HashMap<>(); |
| |
| for (int rack = 0 ; rack < 12 ; rack++) { |
| double cpu = 3600; // %percent |
| double mem = 178_000; // MB |
| for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) { |
| createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList); |
| } |
| } |
| for (int rack = 12 ; rack < 14 ; rack++) { |
| double cpu = 2400; // %percent |
| double mem = 118_100; // MB |
| for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) { |
| createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList); |
| } |
| } |
| for (int rack = 14 ; rack < 16 ; rack++) { |
| double cpu = 1200; // %percent |
| double mem = 42_480; // MB |
| for (int superInRack = 0; superInRack < numSupersPerRack ; superInRack++) { |
| createAndAddOneSupervisor(rack, superInRack, cpu - 100 * (superInRack % 2), mem, numPorts, retList); |
| } |
| } |
| return retList; |
| } |
| |
| /** |
| * Create a large cluster, read topologies and configuration from resource directory and schedule. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testLargeCluster() throws Exception { |
| Map<String, SupervisorDetails> supervisors = createSupervisors(0); |
| |
| TopologyDetails[] topoDetailsArray = createTopoDetailsArray(false); |
| Assert.assertTrue("No topologies found", topoDetailsArray.length > 0); |
| Topologies topologies = new Topologies(topoDetailsArray); |
| |
| Config confWithDefaultStrategy = new Config(); |
| confWithDefaultStrategy.putAll(topoDetailsArray[0].getConf()); |
| confWithDefaultStrategy.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName()); |
| confWithDefaultStrategy.put( |
| Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, |
| TestUtilsForResourceAwareScheduler.GenSupervisorsDnsToSwitchMapping.class.getName()); |
| |
| INimbus iNimbus = new INimbusTest(); |
| Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supervisors, new HashMap<>(), |
| topologies, confWithDefaultStrategy); |
| |
| scheduler = new ResourceAwareScheduler(); |
| |
| List<Class> classesToDebug = Arrays.asList(DefaultResourceAwareStrategy.class, |
| GenericResourceAwareStrategy.class, ResourceAwareScheduler.class, |
| Cluster.class |
| ); |
| Level logLevel = Level.INFO ; // switch to Level.DEBUG for verbose otherwise Level.INFO |
| classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), logLevel)); |
| long startTime = System.currentTimeMillis(); |
| scheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry()); |
| scheduler.schedule(topologies, cluster); |
| long endTime = System.currentTimeMillis(); |
| LOG.info("Scheduling Time: {} topologies in {} seconds", topoDetailsArray.length, (endTime - startTime) / 1000.0); |
| |
| for (TopologyDetails td : topoDetailsArray) { |
| TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, td.getName()); |
| } |
| |
| // Remove topology and reschedule it |
| for (int i = 0 ; i < topoDetailsArray.length ; i++) { |
| startTime = System.currentTimeMillis(); |
| TopologyDetails topoDetails = topoDetailsArray[i]; |
| cluster.unassign(topoDetails.getId()); |
| LOG.info("({}) Removed topology {}", i, topoDetails.getName()); |
| IScheduler rescheduler = new ResourceAwareScheduler(); |
| rescheduler.prepare(confWithDefaultStrategy, new StormMetricsRegistry()); |
| rescheduler.schedule(topologies, cluster); |
| TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, topoDetails.getName()); |
| endTime = System.currentTimeMillis(); |
| LOG.info("({}) Scheduling Time: Removed topology {} and rescheduled in {} seconds", i, topoDetails.getName(), (endTime - startTime) / 1000.0); |
| } |
| classesToDebug.forEach(x -> Configurator.setLevel(x.getName(), Level.INFO)); |
| } |
| |
| public static class SupervisorDistribution { |
| final String rackId; |
| final int supervisorCnt; |
| final int slotCnt; |
| final int memoryMb; |
| final int cpuPercent; |
| |
| public SupervisorDistribution(int supervisorCnt, String rackId, int slotCnt, int memoryMb, int cpuPercent) { |
| this.rackId = rackId; |
| this.supervisorCnt = supervisorCnt; |
| this.slotCnt = slotCnt; |
| this.memoryMb = memoryMb; |
| this.cpuPercent = cpuPercent; |
| } |
| |
| public static Map<String, Collection<SupervisorDistribution>> mapByRackId(Collection<SupervisorDistribution> supervisors) { |
| Map<String, Collection<SupervisorDistribution>> retVal = new HashMap<>(); |
| supervisors.forEach(x -> retVal.computeIfAbsent(x.rackId, rackId -> new ArrayList<>()).add(x)); |
| return retVal; |
| } |
| |
| public static Collection<SupervisorDistribution> getSupervisorDistribution02() { |
| return Arrays.asList( |
| // Cnt, Rack, Slot, Mem, CPU |
| new SupervisorDistribution(78, "r001", 12, 42461, 1100), |
| new SupervisorDistribution(146, "r002", 36, 181362, 3500), |
| new SupervisorDistribution(18, "r003", 36, 181362, 3500), |
| new SupervisorDistribution(120, "r004", 36, 181362, 3500), |
| new SupervisorDistribution(24, "r005", 36, 181362, 3500), |
| new SupervisorDistribution(16, "r005", 48, 177748, 4700), |
| new SupervisorDistribution(12, "r006", 18, 88305, 1800), |
| new SupervisorDistribution(368, "r006", 36, 181205, 3500), |
| new SupervisorDistribution(62, "r007", 48, 177748, 4700), |
| new SupervisorDistribution(50, "r008", 36, 181348, 3500), |
| new SupervisorDistribution(64, "r008", 48, 177748, 4700), |
| new SupervisorDistribution(74, "r009", 48, 177748, 4700), |
| new SupervisorDistribution(74, "r010", 48, 177748, 4700), |
| new SupervisorDistribution(10, "r011", 48, 177748, 4700), |
| new SupervisorDistribution(78, "r012", 24, 120688, 2300), |
| new SupervisorDistribution(150, "r013", 48, 177748, 4700), |
| new SupervisorDistribution(76, "r014", 36, 181362, 3500), |
| new SupervisorDistribution(38, "r015", 48, 174431, 4700), |
| new SupervisorDistribution(78, "r016", 36, 181375, 3500), |
| new SupervisorDistribution(72, "r017", 36, 181362, 3500), |
| new SupervisorDistribution(80, "r018", 36, 181362, 3500), |
| new SupervisorDistribution(76, "r019", 36, 181362, 3500), |
| new SupervisorDistribution(78, "r020", 24, 120696, 2300), |
| new SupervisorDistribution(80, "r021", 24, 120696, 2300) |
| ); |
| } |
| |
| public static String clusterCapacity(Collection<SupervisorDistribution> supervisorDistributions) { |
| long cpuPercent = 0; |
| long memoryMb = 0; |
| int supervisorCnt = 0; |
| Set<String> racks = new HashSet<>(); |
| |
| for (SupervisorDistribution x: supervisorDistributions) { |
| memoryMb += (x.supervisorCnt * x.memoryMb); |
| cpuPercent += (x.supervisorCnt * x.cpuPercent); |
| supervisorCnt += x.supervisorCnt; |
| racks.add(x.rackId); |
| }; |
| return String.format("Cluster summary: Racks=%d, Supervisors=%d, memoryMb=%d, cpuPercent=%d", |
| racks.size(), supervisorCnt, memoryMb, cpuPercent); |
| } |
| } |
| |
| public static class INimbusTest implements INimbus { |
| @Override |
| public void prepare(Map<String, Object> topoConf, String schedulerLocalDir) { |
| |
| } |
| |
| @Override |
| public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, |
| Topologies topologies, Set<String> topologiesMissingAssignments) { |
| //return null; |
| Set<WorkerSlot> ret = new HashSet<>(); |
| for (SupervisorDetails sd : existingSupervisors) { |
| String id = sd.getId(); |
| for (Number port : (Collection<Number>) sd.getMeta()) { |
| ret.add(new WorkerSlot(id, port)); |
| } |
| } |
| return ret; |
| } |
| |
| @Override |
| public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId) { |
| |
| } |
| |
| @Override |
| public String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId) { |
| if (existingSupervisors.containsKey(nodeId)) { |
| return existingSupervisors.get(nodeId).getHost(); |
| } |
| return null; |
| } |
| |
| @Override |
| public IScheduler getForcedScheduler() { |
| return null; |
| } |
| } |
| } |