blob: 3fe37ddada7abbb13c0b8b4afeee8ce82057c4b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.scheduler.resource.strategies;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.TreeMap;
import java.util.HashSet;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.scheduler.Cluster;
import backtype.storm.scheduler.ExecutorDetails;
import backtype.storm.scheduler.Topologies;
import backtype.storm.scheduler.TopologyDetails;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.scheduler.resource.Component;
import backtype.storm.scheduler.resource.RAS_Node;
public class ResourceAwareStrategy implements IStrategy {
private static final Logger LOG = LoggerFactory.getLogger(ResourceAwareStrategy.class);
private Topologies _topologies;
private Cluster _cluster;
//Map key is the supervisor id and the value is the corresponding RAS_Node Object
private Map<String, RAS_Node> _availNodes;
private RAS_Node refNode = null;
/**
* supervisor id -> Node
*/
private Map<String, RAS_Node> _nodes;
private Map<String, List<String>> _clusterInfo;
private final double CPU_WEIGHT = 1.0;
private final double MEM_WEIGHT = 1.0;
private final double NETWORK_WEIGHT = 1.0;
public ResourceAwareStrategy(Cluster cluster, Topologies topologies) {
_topologies = topologies;
_cluster = cluster;
_nodes = RAS_Node.getAllNodesFrom(cluster, _topologies);
_availNodes = this.getAvailNodes();
_clusterInfo = cluster.getNetworkTopography();
LOG.debug(this.getClusterInfo());
}
//the returned TreeMap keeps the Components sorted
private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(
Queue<Component> ordered__Component_list, Collection<ExecutorDetails> unassignedExecutors) {
TreeMap<Integer, List<ExecutorDetails>> retMap = new TreeMap<>();
Integer rank = 0;
for (Component ras_comp : ordered__Component_list) {
retMap.put(rank, new ArrayList<ExecutorDetails>());
for(ExecutorDetails exec : ras_comp.execs) {
if(unassignedExecutors.contains(exec)) {
retMap.get(rank).add(exec);
}
}
rank++;
}
return retMap;
}
public Map<WorkerSlot, Collection<ExecutorDetails>> schedule(TopologyDetails td) {
if (_availNodes.size() <= 0) {
LOG.warn("No available nodes to schedule tasks on!");
return null;
}
Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
List<Component> spouts = this.getSpouts(_topologies, td);
if (spouts.size() == 0) {
LOG.error("Cannot find a Spout!");
return null;
}
Queue<Component> ordered__Component_list = bfs(_topologies, td, spouts);
Map<Integer, List<ExecutorDetails>> priorityToExecutorMap = getPriorityToExecutorDetailsListMap(ordered__Component_list, unassignedExecutors);
Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
Integer longestPriorityListSize = this.getLongestPriorityListSize(priorityToExecutorMap);
//Pick the first executor with priority one, then the 1st exec with priority 2, so on an so forth.
//Once we reach the last priority, we go back to priority 1 and schedule the second task with priority 1.
for (int i = 0; i < longestPriorityListSize; i++) {
for (Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorMap.entrySet()) {
Iterator<ExecutorDetails> it = entry.getValue().iterator();
if (it.hasNext()) {
ExecutorDetails exec = it.next();
LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}",
new Object[] { exec, td.getExecutorToComponent().get(exec),
td.getTaskResourceReqList(exec), entry.getKey() });
WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
if (targetSlot != null) {
RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
if(!schedulerAssignmentMap.containsKey(targetSlot)) {
schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
}
schedulerAssignmentMap.get(targetSlot).add(exec);
targetNode.consumeResourcesforTask(exec, td);
scheduledTasks.add(exec);
LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
targetNode, targetNode.getAvailableMemoryResources(),
targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
targetNode.getTotalCpuResources(), targetSlot);
} else {
LOG.error("Not Enough Resources to schedule Task {}", exec);
}
it.remove();
}
}
}
executorsNotScheduled.removeAll(scheduledTasks);
LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
// schedule left over system tasks
for (ExecutorDetails exec : executorsNotScheduled) {
WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
if (targetSlot != null) {
RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
if(!schedulerAssignmentMap.containsKey(targetSlot)) {
schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
}
schedulerAssignmentMap.get(targetSlot).add(exec);
targetNode.consumeResourcesforTask(exec, td);
scheduledTasks.add(exec);
LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", exec,
targetNode, targetNode.getAvailableMemoryResources(),
targetNode.getAvailableCpuResources(), targetNode.getTotalMemoryResources(),
targetNode.getTotalCpuResources(), targetSlot);
} else {
LOG.error("Not Enough Resources to schedule Task {}", exec);
}
}
executorsNotScheduled.removeAll(scheduledTasks);
if (executorsNotScheduled.size() > 0) {
LOG.error("Not all executors successfully scheduled: {}",
executorsNotScheduled);
schedulerAssignmentMap = null;
} else {
LOG.debug("All resources successfully scheduled!");
}
if (schedulerAssignmentMap == null) {
LOG.error("Topology {} not successfully scheduled!", td.getId());
}
return schedulerAssignmentMap;
}
private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
WorkerSlot ws;
// first scheduling
if (this.refNode == null) {
String clus = this.getBestClustering();
ws = this.getBestWorker(exec, td, clus, scheduleAssignmentMap);
} else {
ws = this.getBestWorker(exec, td, scheduleAssignmentMap);
}
if(ws != null) {
this.refNode = this.idToNode(ws.getNodeId());
}
LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
return ws;
}
private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
return this.getBestWorker(exec, td, null, scheduleAssignmentMap);
}
private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
double taskMem = td.getTotalMemReqTask(exec);
double taskCPU = td.getTotalCpuReqTask(exec);
List<RAS_Node> nodes;
if(clusterId != null) {
nodes = this.getAvailableNodesFromCluster(clusterId);
} else {
nodes = this.getAvailableNodes();
}
//First sort nodes by distance
TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<>();
for (RAS_Node n : nodes) {
if(n.getFreeSlots().size()>0) {
if (n.getAvailableMemoryResources() >= taskMem
&& n.getAvailableCpuResources() >= taskCPU) {
double a = Math.pow(((taskCPU - n.getAvailableCpuResources())/(n.getAvailableCpuResources() + 1))
* this.CPU_WEIGHT, 2);
double b = Math.pow(((taskMem - n.getAvailableMemoryResources())/(n.getAvailableMemoryResources() + 1))
* this.MEM_WEIGHT, 2);
double c = 0.0;
if(this.refNode != null) {
c = Math.pow(this.distToNode(this.refNode, n)
* this.NETWORK_WEIGHT, 2);
}
double distance = Math.sqrt(a + b + c);
nodeRankMap.put(distance, n);
}
}
}
//Then, pick worker from closest node that satisfy constraints
for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
RAS_Node n = entry.getValue();
for(WorkerSlot ws : n.getFreeSlots()) {
if(checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) {
return ws;
}
}
}
return null;
}
private String getBestClustering() {
String bestCluster = null;
Double mostRes = 0.0;
for (Entry<String, List<String>> cluster : _clusterInfo
.entrySet()) {
Double clusterTotalRes = this.getTotalClusterRes(cluster.getValue());
if (clusterTotalRes > mostRes) {
mostRes = clusterTotalRes;
bestCluster = cluster.getKey();
}
}
return bestCluster;
}
private Double getTotalClusterRes(List<String> cluster) {
Double res = 0.0;
for (String node : cluster) {
res += _availNodes.get(this.NodeHostnameToId(node))
.getAvailableMemoryResources()
+ _availNodes.get(this.NodeHostnameToId(node))
.getAvailableCpuResources();
}
return res;
}
private Double distToNode(RAS_Node src, RAS_Node dest) {
if (src.getId().equals(dest.getId())) {
return 0.0;
} else if (this.NodeToCluster(src).equals(this.NodeToCluster(dest))) {
return 0.5;
} else {
return 1.0;
}
}
private String NodeToCluster(RAS_Node node) {
for (Entry<String, List<String>> entry : _clusterInfo
.entrySet()) {
if (entry.getValue().contains(node.getHostname())) {
return entry.getKey();
}
}
LOG.error("Node: {} not found in any clusters", node.getHostname());
return null;
}
private List<RAS_Node> getAvailableNodes() {
LinkedList<RAS_Node> nodes = new LinkedList<>();
for (String clusterId : _clusterInfo.keySet()) {
nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
}
return nodes;
}
private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
List<RAS_Node> retList = new ArrayList<>();
for (String node_id : _clusterInfo.get(clus)) {
retList.add(_availNodes.get(this
.NodeHostnameToId(node_id)));
}
return retList;
}
private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
List<WorkerSlot> workers = new LinkedList<>();
for(RAS_Node node : nodes) {
workers.addAll(node.getFreeSlots());
}
return workers;
}
private List<WorkerSlot> getAvailableWorker() {
List<WorkerSlot> workers = new LinkedList<>();
for (String clusterId : _clusterInfo.keySet()) {
workers.addAll(this.getAvailableWorkersFromCluster(clusterId));
}
return workers;
}
/**
* In case in the future RAS can only use a subset of nodes
*/
private Map<String, RAS_Node> getAvailNodes() {
return _nodes;
}
/**
* Breadth first traversal of the topology DAG
* @param topologies
* @param td
* @param spouts
* @return A partial ordering of components
*/
private Queue<Component> bfs(Topologies topologies, TopologyDetails td, List<Component> spouts) {
// Since queue is a interface
Queue<Component> ordered__Component_list = new LinkedList<Component>();
HashMap<String, Component> visited = new HashMap<>();
/* start from each spout that is not visited, each does a breadth-first traverse */
for (Component spout : spouts) {
if (!visited.containsKey(spout.id)) {
Queue<Component> queue = new LinkedList<>();
queue.offer(spout);
while (!queue.isEmpty()) {
Component comp = queue.poll();
visited.put(comp.id, comp);
ordered__Component_list.add(comp);
List<String> neighbors = new ArrayList<>();
neighbors.addAll(comp.children);
neighbors.addAll(comp.parents);
for (String nbID : neighbors) {
if (!visited.containsKey(nbID)) {
Component child = topologies.getAllComponents().get(td.getId()).get(nbID);
queue.offer(child);
}
}
}
}
}
return ordered__Component_list;
}
private List<Component> getSpouts(Topologies topologies, TopologyDetails td) {
List<Component> spouts = new ArrayList<>();
for (Component c : topologies.getAllComponents().get(td.getId())
.values()) {
if (c.type == Component.ComponentType.SPOUT) {
spouts.add(c);
}
}
return spouts;
}
private Integer getLongestPriorityListSize(Map<Integer, List<ExecutorDetails>> priorityToExecutorMap) {
Integer mostNum = 0;
for (List<ExecutorDetails> execs : priorityToExecutorMap.values()) {
Integer numExecs = execs.size();
if (mostNum < numExecs) {
mostNum = numExecs;
}
}
return mostNum;
}
/**
* Get the remaining amount memory that can be assigned to a worker given the set worker max heap size
* @param ws
* @param td
* @param scheduleAssignmentMap
* @return The remaining amount of memory
*/
private Double getWorkerScheduledMemoryAvailable(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
Double memScheduleUsed = this.getWorkerScheduledMemoryUse(ws, td, scheduleAssignmentMap);
return td.getTopologyWorkerMaxHeapSize() - memScheduleUsed;
}
/**
* Get the amount of memory already assigned to a worker
* @param ws
* @param td
* @param scheduleAssignmentMap
* @return the amount of memory
*/
private Double getWorkerScheduledMemoryUse(WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
Double totalMem = 0.0;
Collection<ExecutorDetails> execs = scheduleAssignmentMap.get(ws);
if(execs != null) {
for(ExecutorDetails exec : execs) {
totalMem += td.getTotalMemReqTask(exec);
}
}
return totalMem;
}
/**
* Checks whether we can schedule an Executor exec on the worker slot ws
* Only considers memory currently. May include CPU in the future
* @param exec
* @param ws
* @param td
* @param scheduleAssignmentMap
* @return a boolean: True denoting the exec can be scheduled on ws and false if it cannot
*/
private boolean checkWorkerConstraints(ExecutorDetails exec, WorkerSlot ws, TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
boolean retVal = false;
if(this.getWorkerScheduledMemoryAvailable(ws, td, scheduleAssignmentMap) >= td.getTotalMemReqTask(exec)) {
retVal = true;
}
return retVal;
}
/**
* Get the amount of resources available and total for each node
* @return a String with cluster resource info for debug
*/
private String getClusterInfo() {
String retVal = "Cluster info:\n";
for(Entry<String, List<String>> clusterEntry : _clusterInfo.entrySet()) {
String clusterId = clusterEntry.getKey();
retVal += "Rack: " + clusterId + "\n";
for(String nodeHostname : clusterEntry.getValue()) {
RAS_Node node = this.idToNode(this.NodeHostnameToId(nodeHostname));
retVal += "-> Node: " + node.getHostname() + " " + node.getId() + "\n";
retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + "}\n";
retVal += "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + "}\n";
}
}
return retVal;
}
/**
* hostname to Id
* @param hostname
* @return the id of a node
*/
public String NodeHostnameToId(String hostname) {
for (RAS_Node n : _nodes.values()) {
if (n.getHostname() == null) {
continue;
}
if (n.getHostname().equals(hostname)) {
return n.getId();
}
}
LOG.error("Cannot find Node with hostname {}", hostname);
return null;
}
/**
* Find RAS_Node for specified node id
* @param id
* @return a RAS_Node object
*/
public RAS_Node idToNode(String id) {
if(_nodes.containsKey(id) == false) {
LOG.error("Cannot find Node with Id: {}", id);
return null;
}
return _nodes.get(id);
}
}