blob: 95aa5c822570b32b7ff51761796f0bd97ccbc4f0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.scheduler.resource.Component;
import backtype.storm.scheduler.resource.ResourceUtils;
import backtype.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TopologyDetails {
String topologyId;
Map topologyConf;
StormTopology topology;
Map<ExecutorDetails, String> executorToComponent;
int numWorkers;
//<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>>
private Map<ExecutorDetails, Map<String, Double>> _resourceList;
//Max heap size for a worker used by topology
private Double topologyWorkerMaxHeapSize;
private static final Logger LOG = LoggerFactory.getLogger(TopologyDetails.class);
public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers) {
this.topologyId = topologyId;
this.topologyConf = topologyConf;
this.topology = topology;
this.numWorkers = numWorkers;
}
public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers, Map<ExecutorDetails, String> executorToComponents) {
this(topologyId, topologyConf, topology, numWorkers);
this.executorToComponent = new HashMap<>(0);
if (executorToComponents != null) {
this.executorToComponent.putAll(executorToComponents);
}
this.initResourceList();
this.initConfigs();
}
public String getId() {
return topologyId;
}
public String getName() {
return (String) this.topologyConf.get(Config.TOPOLOGY_NAME);
}
public Map getConf() {
return topologyConf;
}
public int getNumWorkers() {
return numWorkers;
}
public Map<ExecutorDetails, String> getExecutorToComponent() {
return this.executorToComponent;
}
public Map<ExecutorDetails, String> selectExecutorToComponent(Collection<ExecutorDetails> executors) {
Map<ExecutorDetails, String> ret = new HashMap<>(executors.size());
for (ExecutorDetails executor : executors) {
String compId = this.executorToComponent.get(executor);
if (compId != null) {
ret.put(executor, compId);
}
}
return ret;
}
public Collection<ExecutorDetails> getExecutors() {
return this.executorToComponent.keySet();
}
private void initResourceList() {
_resourceList = new HashMap<>();
// Extract bolt memory info
if (this.topology.get_bolts() != null) {
for (Map.Entry<String, Bolt> bolt : this.topology.get_bolts().entrySet()) {
//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
Map<String, Double> topology_resources = ResourceUtils.parseResources(bolt
.getValue().get_common().get_json_conf());
ResourceUtils.checkIntialization(topology_resources, bolt.getValue().toString(), this.topologyConf);
for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
_resourceList.put(anExecutorToComponent.getKey(), topology_resources);
}
}
}
}
// Extract spout memory info
if (this.topology.get_spouts() != null) {
for (Map.Entry<String, SpoutSpec> spout : this.topology.get_spouts().entrySet()) {
Map<String, Double> topology_resources = ResourceUtils.parseResources(spout
.getValue().get_common().get_json_conf());
ResourceUtils.checkIntialization(topology_resources, spout.getValue().toString(), this.topologyConf);
for (Map.Entry<ExecutorDetails, String> anExecutorToComponent : executorToComponent.entrySet()) {
if (spout.getKey().equals(anExecutorToComponent.getValue())) {
_resourceList.put(anExecutorToComponent.getKey(), topology_resources);
}
}
}
} else {
LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
}
//schedule tasks that are not part of components returned from topology.get_spout or topology.getbolt (AKA sys tasks most specifically __acker tasks)
for(ExecutorDetails exec : this.getExecutors()) {
if (!_resourceList.containsKey(exec)) {
LOG.debug(
"Scheduling {} {} with memory requirement as 'on heap' - {} and 'off heap' - {} and CPU requirement as {}",
this.getExecutorToComponent().get(exec),
exec,
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
this.topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
this.addDefaultResforExec(exec);
}
}
}
private List<ExecutorDetails> componentToExecs(String comp) {
List<ExecutorDetails> execs = new ArrayList<>();
for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
if (entry.getValue().equals(comp)) {
execs.add(entry.getKey());
}
}
return execs;
}
/**
* Returns a representation of the non-system components of the topology graph
* Each Component object in the returning map is populated with the list of its
* parents, children and execs assigned to that component.
* @return a map of components
*/
public Map<String, Component> getComponents() {
Map<String, Component> all_comp = new HashMap<>();
StormTopology storm_topo = this.topology;
// spouts
if (storm_topo.get_spouts() != null) {
for (Map.Entry<String, SpoutSpec> spoutEntry : storm_topo
.get_spouts().entrySet()) {
if (!Utils.isSystemId(spoutEntry.getKey())) {
Component newComp;
if (all_comp.containsKey(spoutEntry.getKey())) {
newComp = all_comp.get(spoutEntry.getKey());
newComp.execs = componentToExecs(newComp.id);
} else {
newComp = new Component(spoutEntry.getKey());
newComp.execs = componentToExecs(newComp.id);
all_comp.put(spoutEntry.getKey(), newComp);
}
newComp.type = Component.ComponentType.SPOUT;
for (Map.Entry<GlobalStreamId, Grouping> spoutInput : spoutEntry
.getValue().get_common().get_inputs()
.entrySet()) {
newComp.parents.add(spoutInput.getKey()
.get_componentId());
if (!all_comp.containsKey(spoutInput
.getKey().get_componentId())) {
all_comp.put(spoutInput.getKey()
.get_componentId(),
new Component(spoutInput.getKey()
.get_componentId()));
}
all_comp.get(spoutInput.getKey()
.get_componentId()).children.add(spoutEntry
.getKey());
}
}
}
}
// bolts
if (storm_topo.get_bolts() != null) {
for (Map.Entry<String, Bolt> boltEntry : storm_topo.get_bolts()
.entrySet()) {
if (!Utils.isSystemId(boltEntry.getKey())) {
Component newComp;
if (all_comp.containsKey(boltEntry.getKey())) {
newComp = all_comp.get(boltEntry.getKey());
newComp.execs = componentToExecs(newComp.id);
} else {
newComp = new Component(boltEntry.getKey());
newComp.execs = componentToExecs(newComp.id);
all_comp.put(boltEntry.getKey(), newComp);
}
newComp.type = Component.ComponentType.BOLT;
for (Map.Entry<GlobalStreamId, Grouping> boltInput : boltEntry
.getValue().get_common().get_inputs()
.entrySet()) {
newComp.parents.add(boltInput.getKey()
.get_componentId());
if (!all_comp.containsKey(boltInput
.getKey().get_componentId())) {
all_comp.put(boltInput.getKey()
.get_componentId(),
new Component(boltInput.getKey()
.get_componentId()));
}
all_comp.get(boltInput.getKey()
.get_componentId()).children.add(boltEntry
.getKey());
}
}
}
}
return all_comp;
}
/**
* Gets the on heap memory requirement for a
* certain task within a topology
* @param exec the executor the inquiry is concerning.
* @return Double the amount of on heap memory
* requirement for this exec in topology topoId.
*/
public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
Double ret = null;
if (hasExecInTopo(exec)) {
ret = _resourceList
.get(exec)
.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
}
return ret;
}
/**
* Gets the off heap memory requirement for a
* certain task within a topology
* @param exec the executor the inquiry is concerning.
* @return Double the amount of off heap memory
* requirement for this exec in topology topoId.
*/
public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
Double ret = null;
if (hasExecInTopo(exec)) {
ret = _resourceList
.get(exec)
.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
}
return ret;
}
/**
* Gets the total memory requirement for a task
* @param exec the executor the inquiry is concerning.
* @return Double the total memory requirement
* for this exec in topology topoId.
*/
public Double getTotalMemReqTask(ExecutorDetails exec) {
if (hasExecInTopo(exec)) {
return getOffHeapMemoryRequirement(exec)
+ getOnHeapMemoryRequirement(exec);
}
return null;
}
/**
* Gets the total memory resource list for a
* set of tasks that is part of a topology.
* @return Map<ExecutorDetails, Double> a map of the total memory requirement
* for all tasks in topology topoId.
*/
public Map<ExecutorDetails, Double> getTotalMemoryResourceList() {
Map<ExecutorDetails, Double> ret = new HashMap<>();
for (ExecutorDetails exec : _resourceList.keySet()) {
ret.put(exec, getTotalMemReqTask(exec));
}
return ret;
}
/**
* Get the total CPU requirement for executor
* @return Double the total about of cpu requirement for executor
*/
public Double getTotalCpuReqTask(ExecutorDetails exec) {
if (hasExecInTopo(exec)) {
return _resourceList
.get(exec)
.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
}
return null;
}
/**
* Note: The public API relevant to resource aware scheduling is unstable as of May 2015.
* We reserve the right to change them.
*
* @return the total on-heap memory requested for this topology
*/
public Double getTotalRequestedMemOnHeap() {
Double total_memonheap = 0.0;
for (ExecutorDetails exec : this.getExecutors()) {
Double exec_mem = getOnHeapMemoryRequirement(exec);
if (exec_mem != null) {
total_memonheap += exec_mem;
}
}
return total_memonheap;
}
/**
* Note: The public API relevant to resource aware scheduling is unstable as of May 2015.
* We reserve the right to change them.
*
* @return the total off-heap memory requested for this topology
*/
public Double getTotalRequestedMemOffHeap() {
Double total_memoffheap = 0.0;
for (ExecutorDetails exec : this.getExecutors()) {
Double exec_mem = getOffHeapMemoryRequirement(exec);
if (exec_mem != null) {
total_memoffheap += exec_mem;
}
}
return total_memoffheap;
}
/**
* Note: The public API relevant to resource aware scheduling is unstable as of May 2015.
* We reserve the right to change them.
*
* @return the total cpu requested for this topology
*/
public Double getTotalRequestedCpu() {
Double total_cpu = 0.0;
for (ExecutorDetails exec : this.getExecutors()) {
Double exec_cpu = getTotalCpuReqTask(exec);
if (exec_cpu != null) {
total_cpu += exec_cpu;
}
}
return total_cpu;
}
/**
* get the resources requirements for a executor
* @param exec
* @return a map containing the resource requirements for this exec
*/
public Map<String, Double> getTaskResourceReqList(ExecutorDetails exec) {
if (hasExecInTopo(exec)) {
return _resourceList.get(exec);
}
return null;
}
/**
* Checks if a executor is part of this topology
* @return Boolean whether or not a certain ExecutorDetail is included in the _resourceList.
*/
public boolean hasExecInTopo(ExecutorDetails exec) {
return _resourceList != null && _resourceList.containsKey(exec);
}
/**
* add resource requirements for a executor
*/
public void addResourcesForExec(ExecutorDetails exec, Map<String, Double> resourceList) {
if (hasExecInTopo(exec)) {
LOG.warn("Executor {} already exists...ResourceList: {}", exec, getTaskResourceReqList(exec));
return;
}
_resourceList.put(exec, resourceList);
}
/**
* Add default resource requirements for a executor
*/
public void addDefaultResforExec(ExecutorDetails exec) {
Map<String, Double> defaultResourceList = new HashMap<>();
defaultResourceList.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT,
Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT), null));
defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB), null));
defaultResourceList.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
Utils.getDouble(topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB), null));
LOG.debug("Scheduling Executor: {} with memory requirement as onHeap: {} - offHeap: {} " +
"and CPU requirement: {}",
exec, topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB),
topologyConf.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT));
addResourcesForExec(exec, defaultResourceList);
}
/**
* initializes the scheduler member variable by extracting what scheduler
* this topology is going to use from topologyConf
*/
private void initConfigs() {
this.topologyWorkerMaxHeapSize = Utils.getDouble(this.topologyConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB), null);
}
/**
* Get the max heap size for a worker used by this topology
* @return the worker max heap size
*/
public Double getTopologyWorkerMaxHeapSize() {
return this.topologyWorkerMaxHeapSize;
}
}