blob: 72460c1dd7fea7831e340c4a5132c3ac7c3c482a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.RebalanceOptions;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.scheduler.resource.ResourceAwareScheduler;
import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
import org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import org.apache.storm.thrift.TException;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.SpoutDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestRebalance {
static final int SLEEP_TIME_BETWEEN_RETRY = 1000;
private static final Logger LOG = LoggerFactory.getLogger(TestRebalance.class);
public static String topoNameToId(String topoName, ILocalCluster cluster) throws TException {
for (TopologySummary topoSum : cluster.getClusterInfo().get_topologies()) {
if (topoSum.get_name().equals(topoName)) {
return topoSum.get_id();
}
}
return null;
}
@Test
public void testRebalanceTopologyResourcesAndConfigs()
throws Exception {
LOG.info("Starting local cluster...");
Config conf = new Config();
conf.put(DaemonConfig.STORM_SCHEDULER, ResourceAwareScheduler.class.getName());
conf.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName());
conf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, DefaultResourceAwareStrategy.class.getName());
conf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 10.0);
conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 100.0);
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
Map<String, Double> resourcesMap = new HashMap();
resourcesMap.put("gpu.count", 5.0);
conf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP, resourcesMap);
try (ILocalCluster cluster = new LocalCluster.Builder().withDaemonConf(conf).build()) {
TopologyBuilder builder = new TopologyBuilder();
SpoutDeclarer s1 = builder.setSpout("spout-1", new TestUtilsForResourceAwareScheduler.TestSpout(),
2);
BoltDeclarer b1 = builder.setBolt("bolt-1", new TestUtilsForResourceAwareScheduler.TestBolt(),
2).shuffleGrouping("spout-1");
BoltDeclarer b2 = builder.setBolt("bolt-2", new TestUtilsForResourceAwareScheduler.TestBolt(),
2).shuffleGrouping("bolt-1");
StormTopology stormTopology = builder.createTopology();
LOG.info("submitting topologies....");
String topoName = "topo1";
cluster.submitTopology(topoName, new HashMap<>(), stormTopology);
waitTopologyScheduled(topoName, cluster, 20);
RebalanceOptions opts = new RebalanceOptions();
Map<String, Map<String, Double>> resources = new HashMap<String, Map<String, Double>>();
resources.put("spout-1", new HashMap<String, Double>());
resources.get("spout-1").put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 120.0);
resources.get("spout-1").put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 25.0);
resources.get("spout-1").put("gpu.count", 5.0);
opts.set_topology_resources_overrides(resources);
opts.set_wait_secs(0);
JSONObject jsonObject = new JSONObject();
jsonObject.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 768.0);
opts.set_topology_conf_overrides(jsonObject.toJSONString());
LOG.info("rebalancing....");
cluster.rebalance("topo1", opts);
waitTopologyScheduled(topoName, cluster, 10);
boolean topologyUpdated = false;
JSONParser parser = new JSONParser();
for (int i = 0; i < 5; i++) {
Utils.sleep(SLEEP_TIME_BETWEEN_RETRY);
String confRaw = cluster.getTopologyConf(topoNameToId(topoName, cluster));
JSONObject readConf = (JSONObject) parser.parse(confRaw);
if (768.0 == (double) readConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) {
topologyUpdated = true;
break;
}
}
StormTopology readStormTopology = cluster.getTopology(topoNameToId(topoName, cluster));
String componentConfRaw = readStormTopology.get_spouts().get("spout-1").get_common().get_json_conf();
JSONObject readTopologyConf = (JSONObject) parser.parse(componentConfRaw);
Map<String, Double> componentResources = (Map<String, Double>) readTopologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_MAP);
assertTrue("Topology has been updated", topologyUpdated);
assertEquals("Updated CPU correct", 25.0, componentResources.get(Constants.COMMON_CPU_RESOURCE_NAME), 0.001);
assertEquals("Updated Memory correct", 120.0, componentResources.get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME), 0.001);
assertEquals("Updated Generic resource correct", 5.0, componentResources.get("gpu.count"), 0.001);
}
}
public void waitTopologyScheduled(String topoName, ILocalCluster cluster, int retryAttempts) throws TException {
for (int i = 0; i < retryAttempts; i++) {
if (checkTopologyScheduled(topoName, cluster)) {
//sleep to prevent race conditions
Utils.sleep(SLEEP_TIME_BETWEEN_RETRY);
return;
}
Utils.sleep(SLEEP_TIME_BETWEEN_RETRY);
}
throw new RuntimeException("Error: Wait for topology " + topoName + " to be ACTIVE has timed out!");
}
public boolean checkTopologyScheduled(String topoName, ILocalCluster cluster) throws TException {
if (checkTopologyUp(topoName, cluster)) {
ClusterSummary sum = cluster.getClusterInfo();
for (TopologySummary topoSum : sum.get_topologies()) {
if (topoSum.get_name().equals(topoName)) {
String status = topoSum.get_status();
String sched_status = topoSum.get_sched_status();
if (status.equals("ACTIVE") && (sched_status != null && !sched_status.equals(""))) {
return true;
}
}
}
}
return false;
}
public boolean checkTopologyUp(String topoName, ILocalCluster cluster) throws TException {
ClusterSummary sum = cluster.getClusterInfo();
for (TopologySummary topoSum : sum.get_topologies()) {
if (topoSum.get_name().equals(topoName)) {
return true;
}
}
return false;
}
}