blob: 68459e2e6c4eacebdcb8b02742bdbd0ce8de7b02 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.loadgen;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.storm.Config;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.BoltStats;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.StreamInfo;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologyPageInfo;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.generated.WorkerSummary;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.ObjectReader;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Capture running topologies for load gen later on.
*/
public class CaptureLoad {
private static final Logger LOG = LoggerFactory.getLogger(CaptureLoad.class);
public static final String DEFAULT_OUT_DIR = "./loadgen/";
private static List<Double> extractBoltValues(List<ExecutorSummary> summaries,
GlobalStreamId id,
Function<BoltStats, Map<String, Map<GlobalStreamId, Double>>> func) {
List<Double> ret = new ArrayList<>();
if (summaries != null) {
for (ExecutorSummary summ : summaries) {
if (summ != null && summ.is_set_stats()) {
Map<String, Map<GlobalStreamId, Double>> data = func.apply(summ.get_stats().get_specific().get_bolt());
if (data != null) {
List<Double> subvalues = data.values().stream()
.map((subMap) -> subMap.get(id))
.filter((value) -> value != null)
.collect(Collectors.toList());
ret.addAll(subvalues);
}
}
}
}
return ret;
}
static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary topologySummary) throws Exception {
String topologyName = topologySummary.get_name();
LOG.info("Capturing {}...", topologyName);
String topologyId = topologySummary.get_id();
TopologyInfo info = client.getTopologyInfo(topologyId);
TopologyPageInfo tpinfo = client.getTopologyPageInfo(topologyId, ":all-time", false);
@SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
StormTopology topo = client.getUserTopology(topologyId);
//Done capturing topology information...
Map<String, Object> savedTopoConf = new HashMap<>();
Map<String, Object> topoConf = (Map<String, Object>) JSONValue.parse(client.getTopologyConf(topologyId));
for (String key: TopologyLoadConf.IMPORTANT_CONF_KEYS) {
Object o = topoConf.get(key);
if (o != null) {
savedTopoConf.put(key, o);
LOG.info("with config {}: {}", key, o);
}
}
//Lets use the number of actually scheduled workers as a way to bridge RAS and non-RAS
int numWorkers = tpinfo.get_num_workers();
if (savedTopoConf.containsKey(Config.TOPOLOGY_WORKERS)) {
numWorkers = Math.max(numWorkers, ((Number) savedTopoConf.get(Config.TOPOLOGY_WORKERS)).intValue());
}
savedTopoConf.put(Config.TOPOLOGY_WORKERS, numWorkers);
Map<String, LoadCompConf.Builder> boltBuilders = new HashMap<>();
Map<String, LoadCompConf.Builder> spoutBuilders = new HashMap<>();
List<InputStream.Builder> inputStreams = new ArrayList<>();
Map<GlobalStreamId, OutputStream.Builder> outStreams = new HashMap<>();
//Bolts
if (topo.get_bolts() != null) {
for (Map.Entry<String, Bolt> boltSpec : topo.get_bolts().entrySet()) {
String boltComp = boltSpec.getKey();
LOG.info("Found bolt {}...", boltComp);
Bolt bolt = boltSpec.getValue();
ComponentCommon common = bolt.get_common();
Map<GlobalStreamId, Grouping> inputs = common.get_inputs();
if (inputs != null) {
for (Map.Entry<GlobalStreamId, Grouping> input : inputs.entrySet()) {
GlobalStreamId id = input.getKey();
LOG.info("with input {}...", id);
Grouping grouping = input.getValue();
InputStream.Builder builder = new InputStream.Builder()
.withId(id.get_streamId())
.withFromComponent(id.get_componentId())
.withToComponent(boltComp)
.withGroupingType(grouping);
inputStreams.add(builder);
}
}
Map<String, StreamInfo> outputs = common.get_streams();
if (outputs != null) {
for (String name : outputs.keySet()) {
GlobalStreamId id = new GlobalStreamId(boltComp, name);
LOG.info("and output {}...", id);
OutputStream.Builder builder = new OutputStream.Builder()
.withId(name);
outStreams.put(id, builder);
}
}
LoadCompConf.Builder builder = new LoadCompConf.Builder()
.withParallelism(common.get_parallelism_hint())
.withId(boltComp);
boltBuilders.put(boltComp, builder);
}
Map<String, Map<String, Double>> boltResources = getBoltsResources(topo, topoConf);
for (Map.Entry<String, Map<String, Double>> entry: boltResources.entrySet()) {
LoadCompConf.Builder bd = boltBuilders.get(entry.getKey());
if (bd != null) {
Map<String, Double> resources = entry.getValue();
Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
if (cpu != null) {
bd.withCpuLoad(cpu);
}
Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
if (mem != null) {
bd.withMemoryLoad(mem);
}
}
}
}
//Spouts
if (topo.get_spouts() != null) {
for (Map.Entry<String, SpoutSpec> spoutSpec : topo.get_spouts().entrySet()) {
String spoutComp = spoutSpec.getKey();
LOG.info("Found Spout {}...", spoutComp);
SpoutSpec spout = spoutSpec.getValue();
ComponentCommon common = spout.get_common();
Map<String, StreamInfo> outputs = common.get_streams();
if (outputs != null) {
for (String name : outputs.keySet()) {
GlobalStreamId id = new GlobalStreamId(spoutComp, name);
LOG.info("with output {}...", id);
OutputStream.Builder builder = new OutputStream.Builder()
.withId(name);
outStreams.put(id, builder);
}
}
LoadCompConf.Builder builder = new LoadCompConf.Builder()
.withParallelism(common.get_parallelism_hint())
.withId(spoutComp);
spoutBuilders.put(spoutComp, builder);
}
Map<String, Map<String, Double>> spoutResources = getSpoutsResources(topo, topoConf);
for (Map.Entry<String, Map<String, Double>> entry: spoutResources.entrySet()) {
LoadCompConf.Builder sd = spoutBuilders.get(entry.getKey());
if (sd != null) {
Map<String, Double> resources = entry.getValue();
Double cpu = resources.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
if (cpu != null) {
sd.withCpuLoad(cpu);
}
Double mem = resources.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
if (mem != null) {
sd.withMemoryLoad(mem);
}
}
}
}
//Stats...
Map<String, List<ExecutorSummary>> byComponent = new HashMap<>();
for (ExecutorSummary executor: info.get_executors()) {
String component = executor.get_component_id();
List<ExecutorSummary> list = byComponent.get(component);
if (list == null) {
list = new ArrayList<>();
byComponent.put(component, list);
}
list.add(executor);
}
List<InputStream> streams = new ArrayList<>(inputStreams.size());
//Compute the stats for the different input streams
for (InputStream.Builder builder : inputStreams) {
GlobalStreamId streamId = new GlobalStreamId(builder.getFromComponent(), builder.getId());
List<ExecutorSummary> summaries = byComponent.get(builder.getToComponent());
//Execute and process latency...
builder.withProcessTime(new NormalDistStats(
extractBoltValues(summaries, streamId, BoltStats::get_process_ms_avg)));
builder.withExecTime(new NormalDistStats(
extractBoltValues(summaries, streamId, BoltStats::get_execute_ms_avg)));
//InputStream is done
streams.add(builder.build());
}
//There is a bug in some versions that returns 0 for the uptime.
// To work around it we should get it an alternative (working) way.
Map<String, Integer> workerToUptime = new HashMap<>();
for (WorkerSummary ws : tpinfo.get_workers()) {
workerToUptime.put(ws.get_supervisor_id() + ":" + ws.get_port(), ws.get_uptime_secs());
}
LOG.debug("WORKER TO UPTIME {}", workerToUptime);
for (Map.Entry<GlobalStreamId, OutputStream.Builder> entry : outStreams.entrySet()) {
OutputStream.Builder builder = entry.getValue();
GlobalStreamId id = entry.getKey();
List<Double> emittedRate = new ArrayList<>();
List<ExecutorSummary> summaries = byComponent.get(id.get_componentId());
if (summaries != null) {
for (ExecutorSummary summary: summaries) {
if (summary.is_set_stats()) {
int uptime = summary.get_uptime_secs();
LOG.debug("UPTIME {}", uptime);
if (uptime <= 0) {
//Likely it is because of a bug, so try to get it another way
String key = summary.get_host() + ":" + summary.get_port();
uptime = workerToUptime.getOrDefault(key, 1);
LOG.debug("Getting uptime for worker {}, {}", key, uptime);
}
for (Map.Entry<String, Map<String, Long>> statEntry : summary.get_stats().get_emitted().entrySet()) {
String timeWindow = statEntry.getKey();
long timeSecs = uptime;
try {
timeSecs = Long.valueOf(timeWindow);
} catch (NumberFormatException e) {
//Ignored...
}
timeSecs = Math.min(timeSecs, uptime);
Long count = statEntry.getValue().get(id.get_streamId());
if (count != null) {
LOG.debug("{} emitted {} for {} secs or {} tuples/sec",
id, count, timeSecs, count.doubleValue() / timeSecs);
emittedRate.add(count.doubleValue() / timeSecs);
}
}
}
}
}
builder.withRate(new NormalDistStats(emittedRate));
//The OutputStream is done
LoadCompConf.Builder comp = boltBuilders.get(id.get_componentId());
if (comp == null) {
comp = spoutBuilders.get(id.get_componentId());
}
comp.withStream(builder.build());
}
List<LoadCompConf> spouts = spoutBuilders.values().stream()
.map((b) -> b.build())
.collect(Collectors.toList());
List<LoadCompConf> bolts = boltBuilders.values().stream()
.map((b) -> b.build())
.collect(Collectors.toList());
return new TopologyLoadConf(topologyName, savedTopoConf, spouts, bolts, streams);
}
/**
* Main entry point for CaptureLoad command.
* @param args the arguments to the command
* @throws Exception on any error
*/
public static void main(String[] args) throws Exception {
Options options = new Options();
options.addOption(Option.builder("a")
.longOpt("anonymize")
.desc("Strip out any possibly identifiable information")
.build());
options.addOption(Option.builder("o")
.longOpt("output-dir")
.argName("<file>")
.hasArg()
.desc("Where to write (defaults to " + DEFAULT_OUT_DIR + ")")
.build());
options.addOption(Option.builder("h")
.longOpt("help")
.desc("Print a help message")
.build());
CommandLineParser parser = new DefaultParser();
CommandLine cmd = null;
boolean printHelp = false;
try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
System.err.println("ERROR " + e.getMessage());
printHelp = true;
}
if (printHelp || cmd.hasOption('h')) {
new HelpFormatter().printHelp("CaptureLoad [options] [topologyName]*", options);
return;
}
Config conf = new Config();
int exitStatus = -1;
String outputDir = DEFAULT_OUT_DIR;
if (cmd.hasOption('o')) {
outputDir = cmd.getOptionValue('o');
}
File baseOut = new File(outputDir);
LOG.info("Will save captured topologies to {}", baseOut);
baseOut.mkdirs();
try (NimbusClient nc = NimbusClient.getConfiguredClient(conf)) {
Nimbus.Iface client = nc.getClient();
List<String> topologyNames = cmd.getArgList();
ClusterSummary clusterSummary = client.getClusterInfo();
for (TopologySummary topologySummary: clusterSummary.get_topologies()) {
if (topologyNames.isEmpty() || topologyNames.contains(topologySummary.get_name())) {
TopologyLoadConf capturedConf = captureTopology(client, topologySummary);
if (cmd.hasOption('a')) {
capturedConf = capturedConf.anonymize();
}
capturedConf.writeTo(new File(baseOut, capturedConf.name + ".yaml"));
}
}
exitStatus = 0;
} catch (Exception e) {
LOG.error("Error trying to capture topologies...", e);
} finally {
System.exit(exitStatus);
}
}
//ResourceUtils.java is not a available on the classpath to let us parse out the resources we want.
// So we have copied and pasted some of the needed methods here. (with a few changes to logging)
static Map<String, Map<String, Double>> getBoltsResources(StormTopology topology, Map<String, Object> topologyConf) {
Map<String, Map<String, Double>> boltResources = new HashMap<>();
if (topology.get_bolts() != null) {
for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
Map<String, Double> topologyResources = parseResources(bolt.getValue().get_common().get_json_conf());
checkInitialization(topologyResources, bolt.getValue().toString(), topologyConf);
boltResources.put(bolt.getKey(), topologyResources);
}
}
return boltResources;
}
static Map<String, Map<String, Double>> getSpoutsResources(StormTopology topology, Map<String, Object> topologyConf) {
Map<String, Map<String, Double>> spoutResources = new HashMap<>();
if (topology.get_spouts() != null) {
for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
Map<String, Double> topologyResources = parseResources(spout.getValue().get_common().get_json_conf());
checkInitialization(topologyResources, spout.getValue().toString(), topologyConf);
spoutResources.put(spout.getKey(), topologyResources);
}
}
return spoutResources;
}
static Map<String, Double> parseResources(String input) {
Map<String, Double> topologyResources = new HashMap<>();
JSONParser parser = new JSONParser();
LOG.debug("Input to parseResources {}", input);
try {
if (input != null) {
Object obj = parser.parse(input);
JSONObject jsonObject = (JSONObject) obj;
if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)) {
Double topoMemOnHeap = ObjectReader
.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null);
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, topoMemOnHeap);
}
if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)) {
Double topoMemOffHeap = ObjectReader
.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null);
topologyResources.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, topoMemOffHeap);
}
if (jsonObject.containsKey(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT)) {
Double topoCpu = ObjectReader.getDouble(jsonObject.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT),
null);
topologyResources.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, topoCpu);
}
LOG.debug("Topology Resources {}", topologyResources);
}
} catch (org.json.simple.parser.ParseException e) {
LOG.error("Failed to parse component resources is:" + e.toString(), e);
return null;
}
return topologyResources;
}
/**
* Checks if the topology's resource requirements are initialized.
* Will modify topologyResources by adding the appropriate defaults
* @param topologyResources map of resouces requirements
* @param componentId component for which initialization is being conducted
* @param topologyConf topology configuration
* @throws Exception on any error
*/
public static void checkInitialization(Map<String, Double> topologyResources, String componentId, Map<String, Object> topologyConf) {
StringBuilder msgBuilder = new StringBuilder();
for (String resourceName : topologyResources.keySet()) {
msgBuilder.append(checkInitResource(topologyResources, topologyConf, resourceName));
}
if (msgBuilder.length() > 0) {
String resourceDefaults = msgBuilder.toString();
LOG.debug(
"Unable to extract resource requirement for Component {} \n Resources : {}",
componentId, resourceDefaults);
}
}
private static String checkInitResource(Map<String, Double> topologyResources, Map<String, Object> topologyConf, String resourceName) {
StringBuilder msgBuilder = new StringBuilder();
if (topologyResources.containsKey(resourceName)) {
Double resourceValue = (Double) topologyConf.getOrDefault(resourceName, null);
if (resourceValue != null) {
topologyResources.put(resourceName, resourceValue);
msgBuilder.append(resourceName.substring(resourceName.lastIndexOf(".")) + " has been set to " + resourceValue);
}
}
return msgBuilder.toString();
}
}