blob: 29f06e209ebcc8f723e1c94e4579d07451d98a7b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.ComponentType;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.SharedMemory;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TopologyDetails {
private static final Logger LOG = LoggerFactory.getLogger(TopologyDetails.class);
private final String topologyId;
private final Map<String, Object> topologyConf;
private final StormTopology topology;
private final Map<ExecutorDetails, String> executorToComponent;
private final int numWorkers;
//when topology was launched
private final int launchTime;
private final String owner;
private final String topoName;
//<ExecutorDetails - Task, Map<String - Type of resource, Map<String - type of that resource, Double - amount>>>
private Map<ExecutorDetails, NormalizedResourceRequest> resourceList;
//Max heap size for a worker used by topology
private Double topologyWorkerMaxHeapSize;
//topology priority
private Integer topologyPriority;
public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, int numWorkers, String owner) {
this(topologyId, topologyConf, topology, numWorkers, null, 0, owner);
}
public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology,
int numWorkers, Map<ExecutorDetails, String> executorToComponents, String owner) {
this(topologyId, topologyConf, topology, numWorkers, executorToComponents, 0, owner);
}
public TopologyDetails(String topologyId, Map<String, Object> topologyConf, StormTopology topology, int numWorkers,
Map<ExecutorDetails, String> executorToComponents, int launchTime, String owner) {
this.owner = owner;
this.topologyId = topologyId;
this.topologyConf = topologyConf;
this.topology = topology;
this.numWorkers = numWorkers;
this.executorToComponent = new HashMap<>(0);
if (executorToComponents != null) {
this.executorToComponent.putAll(executorToComponents);
}
if (topology != null) {
initResourceList();
}
initConfigs();
this.launchTime = launchTime;
this.topoName = (String) topologyConf.get(Config.TOPOLOGY_NAME);
}
public String getId() {
return topologyId;
}
public String getName() {
return topoName;
}
public Map<String, Object> getConf() {
return topologyConf;
}
public int getNumWorkers() {
return numWorkers;
}
public StormTopology getTopology() {
return topology;
}
public Map<ExecutorDetails, String> getExecutorToComponent() {
return executorToComponent;
}
public Map<ExecutorDetails, String> selectExecutorToComponent(
Collection<ExecutorDetails> executors) {
Map<ExecutorDetails, String> ret = new HashMap<>(executors.size());
for (ExecutorDetails executor : executors) {
String compId = executorToComponent.get(executor);
if (compId != null) {
ret.put(executor, compId);
}
}
return ret;
}
public Set<ExecutorDetails> getExecutors() {
return executorToComponent.keySet();
}
private void initResourceList() {
this.resourceList = new HashMap<>();
// Extract bolt resource info
if (topology.get_bolts() != null) {
for (Map.Entry<String, Bolt> bolt : topology.get_bolts().entrySet()) {
//the json_conf is populated by TopologyBuilder (e.g. boltDeclarer.setMemoryLoad)
NormalizedResourceRequest topologyResources = new NormalizedResourceRequest(bolt.getValue().get_common(),
topologyConf, bolt.getKey());
for (Map.Entry<ExecutorDetails, String> anExecutorToComponent :
executorToComponent.entrySet()) {
if (bolt.getKey().equals(anExecutorToComponent.getValue())) {
resourceList.put(anExecutorToComponent.getKey(), topologyResources);
}
}
}
}
// Extract spout resource info
if (topology.get_spouts() != null) {
for (Map.Entry<String, SpoutSpec> spout : topology.get_spouts().entrySet()) {
NormalizedResourceRequest topologyResources = new NormalizedResourceRequest(spout.getValue().get_common(),
topologyConf, spout.getKey());
for (Map.Entry<ExecutorDetails, String> anExecutorToComponent :
executorToComponent.entrySet()) {
if (spout.getKey().equals(anExecutorToComponent.getValue())) {
resourceList.put(anExecutorToComponent.getKey(), topologyResources);
}
}
}
} else {
LOG.warn("Topology " + topologyId + " does not seem to have any spouts!");
}
//schedule tasks that are not part of components returned from topology.get_spout or
// topology.getbolt (AKA sys tasks most specifically __acker tasks)
for (ExecutorDetails exec : getExecutors()) {
if (!resourceList.containsKey(exec)) {
LOG.debug(
"Scheduling {} {} with resource requirement as {}",
getExecutorToComponent().get(exec),
exec,
topologyConf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB),
resourceList.get(exec)
);
addDefaultResforExec(exec);
}
}
}
private List<ExecutorDetails> componentToExecs(String comp) {
List<ExecutorDetails> execs = new ArrayList<>();
for (Map.Entry<ExecutorDetails, String> entry : executorToComponent.entrySet()) {
if (entry.getValue().equals(comp)) {
execs.add(entry.getKey());
}
}
return execs;
}
private Set<String> getInputsTo(ComponentCommon comp) {
Set<String> ret = new HashSet<>();
for (GlobalStreamId globalId : comp.get_inputs().keySet()) {
ret.add(globalId.get_componentId());
}
return ret;
}
/**
* Returns a representation of the non-system components of the topology graph Each Component object in the returning map is populated
* with the list of its parents, children and execs assigned to that component.
*
* @return a map of components
*/
public Map<String, Component> getComponents() {
Map<String, Component> ret = new HashMap<>();
Map<String, SpoutSpec> spouts = topology.get_spouts();
Map<String, Bolt> bolts = topology.get_bolts();
//Add in all of the components
if (spouts != null) {
for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
String compId = entry.getKey();
if (!Utils.isSystemId(compId)) {
Component comp = new Component(ComponentType.SPOUT, compId, componentToExecs(compId));
ret.put(compId, comp);
}
}
}
if (bolts != null) {
for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
String compId = entry.getKey();
if (!Utils.isSystemId(compId)) {
Component comp = new Component(ComponentType.BOLT, compId, componentToExecs(compId));
ret.put(compId, comp);
}
}
}
//Link the components together
if (spouts != null) {
for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
Component spout = ret.get(entry.getKey());
for (String parentId : getInputsTo(entry.getValue().get_common())) {
ret.get(parentId).addChild(spout);
}
}
}
if (bolts != null) {
for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
Component bolt = ret.get(entry.getKey());
for (String parentId : getInputsTo(entry.getValue().get_common())) {
ret.get(parentId).addChild(bolt);
}
}
}
return ret;
}
public String getComponentFromExecutor(ExecutorDetails exec) {
return executorToComponent.get(exec);
}
/**
* Gets the on heap memory requirement for a certain task within a topology.
*
* @param exec the executor the inquiry is concerning.
* @return Double the amount of on heap memory requirement for this exec in topology topoId.
*/
public Double getOnHeapMemoryRequirement(ExecutorDetails exec) {
Double ret = null;
if (hasExecInTopo(exec)) {
ret = resourceList
.get(exec).getOnHeapMemoryMb();
;
}
return ret;
}
/**
* Gets the off heap memory requirement for a certain task within a topology.
*
* @param exec the executor the inquiry is concerning.
* @return Double the amount of off heap memory requirement for this exec in topology topoId.
*/
public Double getOffHeapMemoryRequirement(ExecutorDetails exec) {
Double ret = null;
if (hasExecInTopo(exec)) {
ret = resourceList
.get(exec).getOffHeapMemoryMb();
}
return ret;
}
/**
* Gets the total memory requirement for a task.
*
* @param exec the executor the inquiry is concerning.
* @return Double the total memory requirement for this exec in topology topoId.
*/
public Double getTotalMemReqTask(ExecutorDetails exec) {
if (hasExecInTopo(exec)) {
return getOffHeapMemoryRequirement(exec)
+ getOnHeapMemoryRequirement(exec);
}
return null;
}
/**
* Gets the total memory resource list for a set of tasks that is part of a topology.
*
* @param executors all executors for a topology
* @return the set of shared memory requests.
*/
public Set<SharedMemory> getSharedMemoryRequests(
Collection<ExecutorDetails> executors
) {
Set<String> components = new HashSet<>();
for (ExecutorDetails exec : executors) {
String component = executorToComponent.get(exec);
if (component != null) {
components.add(component);
}
}
Set<SharedMemory> ret = new HashSet<>();
if (topology != null) {
//topology being null is used for tests We probably should fix that at some point,
// but it is not trivial to do...
Map<String, Set<String>> compToSharedName = topology.get_component_to_shared_memory();
if (compToSharedName != null) {
for (String component : components) {
Set<String> sharedNames = compToSharedName.get(component);
if (sharedNames != null) {
for (String name : sharedNames) {
ret.add(topology.get_shared_memory().get(name));
}
}
}
}
}
return ret;
}
/**
* Get the total resource requirement for an executor.
*
* @param exec the executor to get the resources for.
* @return Double the total about of cpu requirement for executor
*/
public NormalizedResourceRequest getTotalResources(ExecutorDetails exec) {
if (hasExecInTopo(exec)) {
return this.resourceList.get(exec);
}
return null;
}
/**
* Get an approximate total resources needed for this topology. ignores shared memory.
* @return the approximate total resources needed for this topology.
*/
public NormalizedResourceRequest getApproximateTotalResources() {
NormalizedResourceRequest ret = new NormalizedResourceRequest();
for (NormalizedResourceRequest resources : resourceList.values()) {
ret.add(resources);
}
return ret;
}
/**
* Get approximate resources for given topology executors. ignores shared memory.
*
* @param execs the executors the inquiry is concerning.
* @return the approximate resources for the executors.
*/
public NormalizedResourceRequest getApproximateResources(Set<ExecutorDetails> execs) {
NormalizedResourceRequest ret = new NormalizedResourceRequest();
execs.stream()
.filter(x -> hasExecInTopo(x))
.forEach(x -> ret.add(resourceList.get(x)));
return ret;
}
/**
* Get the total CPU requirement for executor.
*
* @return generic resource mapping requirement for the executor
*/
public Double getTotalCpuReqTask(ExecutorDetails exec) {
if (hasExecInTopo(exec)) {
return resourceList.get(exec).getTotalCpu();
}
return null;
}
/**
* Note: The public API relevant to resource aware scheduling is unstable as of May 2015. We reserve the right to change them.
*
* @return the total on-heap memory requested for this topology
*/
public double getTotalRequestedMemOnHeap() {
return getRequestedSharedOnHeap() + getRequestedNonSharedOnHeap();
}
public double getRequestedSharedOnHeap() {
double ret = 0.0;
if (topology.is_set_shared_memory()) {
for (SharedMemory req : topology.get_shared_memory().values()) {
ret += req.get_on_heap();
}
}
return ret;
}
public double getRequestedNonSharedOnHeap() {
double totalMemOnHeap = 0.0;
for (ExecutorDetails exec : this.getExecutors()) {
Double execMem = getOnHeapMemoryRequirement(exec);
if (execMem != null) {
totalMemOnHeap += execMem;
}
}
return totalMemOnHeap;
}
/**
* Note: The public API relevant to resource aware scheduling is unstable as of May 2015. We reserve the right to change them.
*
* @return the total off-heap memory requested for this topology
*/
public double getTotalRequestedMemOffHeap() {
return getRequestedNonSharedOffHeap() + getRequestedSharedOffHeap();
}
public double getRequestedNonSharedOffHeap() {
double totalMemOffHeap = 0.0;
for (ExecutorDetails exec : this.getExecutors()) {
Double execMem = getOffHeapMemoryRequirement(exec);
if (execMem != null) {
totalMemOffHeap += execMem;
}
}
return totalMemOffHeap;
}
public double getRequestedSharedOffHeap() {
double ret = 0.0;
if (topology.is_set_shared_memory()) {
for (SharedMemory req : topology.get_shared_memory().values()) {
ret += req.get_off_heap_worker() + req.get_off_heap_node();
}
}
return ret;
}
/**
* Note: The public API relevant to resource aware scheduling is unstable as of May 2015. We reserve the right to change them.
*
* @return the total cpu requested for this topology
*/
public double getTotalRequestedCpu() {
double totalCpu = 0.0;
for (ExecutorDetails exec : this.getExecutors()) {
Double execCpu = getTotalCpuReqTask(exec);
if (execCpu != null) {
totalCpu += execCpu;
}
}
return totalCpu;
}
/**
* get the resources requirements for a executor.
*
* @param exec executor details
* @return a map containing the resource requirements for this exec
*/
public NormalizedResourceRequest getTaskResourceReqList(ExecutorDetails exec) {
if (hasExecInTopo(exec)) {
return resourceList.get(exec);
}
return null;
}
/**
* Checks if a executor is part of this topology.
*
* @return Boolean whether or not a certain ExecutorDetail is included in the resourceList.
*/
public boolean hasExecInTopo(ExecutorDetails exec) {
return resourceList != null && resourceList.containsKey(exec);
}
/**
* add resource requirements for a executor.
*/
public void addResourcesForExec(ExecutorDetails exec, NormalizedResourceRequest resourceList) {
if (hasExecInTopo(exec)) {
LOG.warn("Executor {} already exists...ResourceList: {}", exec, getTaskResourceReqList(exec));
return;
}
this.resourceList.put(exec, resourceList);
}
/**
* Add default resource requirements for a executor.
*/
private void addDefaultResforExec(ExecutorDetails exec) {
String componentId = getExecutorToComponent().get(exec);
addResourcesForExec(exec, new NormalizedResourceRequest(topologyConf, componentId));
}
/**
* initializes member variables.
*/
private void initConfigs() {
this.topologyWorkerMaxHeapSize =
ObjectReader.getDouble(
topologyConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB), null);
this.topologyPriority =
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRIORITY), null);
assert this.topologyWorkerMaxHeapSize != null;
assert this.topologyPriority != null;
}
/**
* Get the max heap size for a worker used by this topology.
*
* @return the worker max heap size
*/
public Double getTopologyWorkerMaxHeapSize() {
return topologyWorkerMaxHeapSize;
}
/**
* Get the user that submitted this topology.
*/
public String getTopologySubmitter() {
return owner;
}
/**
* get the priority of this topology.
*/
public int getTopologyPriority() {
return topologyPriority;
}
/**
* Get the timestamp of when this topology was launched.
*/
public int getLaunchTime() {
return launchTime;
}
/**
* Get how long this topology has been executing.
*/
public int getUpTime() {
return Time.currentTimeSecs() - launchTime;
}
@Override
public String toString() {
return "Name: "
+ getName()
+ " id: "
+ getId()
+ " Priority: "
+ getTopologyPriority()
+ " Uptime: "
+ getUpTime()
+ " CPU: "
+ getTotalRequestedCpu()
+ " Memory: "
+ (getTotalRequestedMemOffHeap() + getTotalRequestedMemOnHeap());
}
@Override
public int hashCode() {
return topologyId.hashCode();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof TopologyDetails)) {
return false;
}
return (topologyId.equals(((TopologyDetails) o).getId()));
}
}