blob: c7c7867858c6a84cd5d203e86dc9094ee8907eb7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.scheduler.resource;
import backtype.storm.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.scheduler.Cluster;
import backtype.storm.scheduler.ExecutorDetails;
import backtype.storm.scheduler.IScheduler;
import backtype.storm.scheduler.Topologies;
import backtype.storm.scheduler.TopologyDetails;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.scheduler.resource.strategies.ResourceAwareStrategy;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
public class ResourceAwareScheduler implements IScheduler {
private static final Logger LOG = LoggerFactory
.getLogger(ResourceAwareScheduler.class);
@SuppressWarnings("rawtypes")
private Map _conf;
@Override
public void prepare(Map conf) {
_conf = conf;
}
@Override
public void schedule(Topologies topologies, Cluster cluster) {
LOG.debug("\n\n\nRerunning ResourceAwareScheduler...");
ResourceAwareStrategy RAStrategy = new ResourceAwareStrategy(cluster, topologies);
LOG.debug(printScheduling(cluster, topologies));
for (TopologyDetails td : topologies.getTopologies()) {
String topId = td.getId();
Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap;
if (cluster.getUnassignedExecutors(td).size() > 0) {
LOG.debug("/********Scheduling topology {} ************/", topId);
schedulerAssignmentMap = RAStrategy.schedule(td);
double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
double requestedCpu = td.getTotalRequestedCpu();
double assignedMemOnHeap = 0.0;
double assignedMemOffHeap = 0.0;
double assignedCpu = 0.0;
if (schedulerAssignmentMap != null) {
try {
Set<String> nodesUsed = new HashSet<String>();
int assignedWorkers = schedulerAssignmentMap.keySet().size();
for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
WorkerSlot targetSlot = workerToTasksEntry.getKey();
Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
RAS_Node targetNode = RAStrategy.idToNode(targetSlot.getNodeId());
targetNode.assign(targetSlot, td, execsNeedScheduling, cluster);
LOG.debug("ASSIGNMENT TOPOLOGY: {} TASKS: {} To Node: {} on Slot: {}",
td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort());
if (!nodesUsed.contains(targetNode.getId())) {
nodesUsed.add(targetNode.getId());
}
assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap();
assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
assignedCpu += targetSlot.getAllocatedCpu();
}
LOG.debug("Topology: {} assigned to {} nodes on {} workers", td.getId(), nodesUsed.size(), assignedWorkers);
cluster.setStatus(td.getId(), "Fully Scheduled");
} catch (IllegalStateException ex) {
LOG.error(ex.toString());
LOG.error("Unsuccessful in scheduling", td.getId());
cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
}
} else {
LOG.error("Unsuccessful in scheduling", td.getId());
cluster.setStatus(td.getId(), "Unsuccessful in scheduling");
}
Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
LOG.debug("setResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " +
"assigned on-heap mem, off-heap mem, cpu: {} {} {}",
td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
cluster.setResources(td.getId(), resources);
} else {
cluster.setStatus(td.getId(), "Fully Scheduled");
}
}
updateSupervisorsResources(cluster, topologies);
}
private void updateSupervisorsResources(Cluster cluster, Topologies topologies) {
Map<String, Double[]> supervisors_resources = new HashMap<String, Double[]>();
Map<String, RAS_Node> nodes = RAS_Node.getAllNodesFrom(cluster, topologies);
for (Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
RAS_Node node = entry.getValue();
Double totalMem = node.getTotalMemoryResources();
Double totalCpu = node.getTotalCpuResources();
Double usedMem = totalMem - node.getAvailableMemoryResources();
Double usedCpu = totalCpu - node.getAvailableCpuResources();
Double[] resources = {totalMem, totalCpu, usedMem, usedCpu};
supervisors_resources.put(entry.getKey(), resources);
}
cluster.setSupervisorsResourcesMap(supervisors_resources);
}
private Map<String, Double> getUserConf() {
Map<String, Double> ret = new HashMap<String, Double>();
ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
(Double) _conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB));
ret.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB,
(Double) _conf.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB));
return ret;
}
/**
* print scheduling for debug purposes
* @param cluster
* @param topologies
*/
public String printScheduling(Cluster cluster, Topologies topologies) {
StringBuilder str = new StringBuilder();
Map<String, Map<String, Map<WorkerSlot, Collection<ExecutorDetails>>>> schedulingMap = new HashMap<String, Map<String, Map<WorkerSlot, Collection<ExecutorDetails>>>>();
for (TopologyDetails topo : topologies.getTopologies()) {
if (cluster.getAssignmentById(topo.getId()) != null) {
for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignmentById(topo.getId()).getExecutorToSlot().entrySet()) {
WorkerSlot slot = entry.getValue();
String nodeId = slot.getNodeId();
ExecutorDetails exec = entry.getKey();
if (schedulingMap.containsKey(nodeId) == false) {
schedulingMap.put(nodeId, new HashMap<String, Map<WorkerSlot, Collection<ExecutorDetails>>>());
}
if (schedulingMap.get(nodeId).containsKey(topo.getId()) == false) {
schedulingMap.get(nodeId).put(topo.getId(), new HashMap<WorkerSlot, Collection<ExecutorDetails>>());
}
if (schedulingMap.get(nodeId).get(topo.getId()).containsKey(slot) == false) {
schedulingMap.get(nodeId).get(topo.getId()).put(slot, new LinkedList<ExecutorDetails>());
}
schedulingMap.get(nodeId).get(topo.getId()).get(slot).add(exec);
}
}
}
for (Map.Entry<String, Map<String, Map<WorkerSlot, Collection<ExecutorDetails>>>> entry : schedulingMap.entrySet()) {
if (cluster.getSupervisorById(entry.getKey()) != null) {
str.append("/** Node: " + cluster.getSupervisorById(entry.getKey()).getHost() + "-" + entry.getKey() + " **/\n");
} else {
str.append("/** Node: Unknown may be dead -" + entry.getKey() + " **/\n");
}
for (Map.Entry<String, Map<WorkerSlot, Collection<ExecutorDetails>>> topo_sched : schedulingMap.get(entry.getKey()).entrySet()) {
str.append("\t-->Topology: " + topo_sched.getKey() + "\n");
for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> ws : topo_sched.getValue().entrySet()) {
str.append("\t\t->Slot [" + ws.getKey().getPort() + "] -> " + ws.getValue() + "\n");
}
}
}
return str.toString();
}
}