blob: 1f2e795d1d27386e3c041ae28b44e9484636a7d9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package backtype.storm.scheduler.resource;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.ArrayList;
import backtype.storm.Config;
import backtype.storm.scheduler.Topologies;
import backtype.storm.scheduler.TopologyDetails;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.scheduler.Cluster;
import backtype.storm.scheduler.ExecutorDetails;
import backtype.storm.scheduler.SchedulerAssignment;
import backtype.storm.scheduler.SupervisorDetails;
import backtype.storm.scheduler.WorkerSlot;
/**
* Represents a single node in the cluster.
*/
public class RAS_Node {
private static final Logger LOG = LoggerFactory.getLogger(RAS_Node.class);
private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>();
private final String _nodeId;
private String _hostname;
private boolean _isAlive;
private SupervisorDetails _sup;
private Double _availMemory;
private Double _availCPU;
private List<WorkerSlot> _slots;
private List<ExecutorDetails> _execs;
private Map<WorkerSlot, List<ExecutorDetails>> _slotToExecs;
public RAS_Node(String nodeId, Set<Integer> allPorts, boolean isAlive,
SupervisorDetails sup) {
_slots = new ArrayList<WorkerSlot>();
_execs = new ArrayList<ExecutorDetails>();
_slotToExecs = new HashMap<WorkerSlot, List<ExecutorDetails>>();
_nodeId = nodeId;
_isAlive = isAlive;
if (_isAlive && allPorts != null) {
for (int port : allPorts) {
_freeSlots.add(new WorkerSlot(_nodeId, port));
}
_sup = sup;
_hostname = sup.getHost();
_availMemory = this.getTotalMemoryResources();
_availCPU = this.getTotalCpuResources();
_slots.addAll(_freeSlots);
for (WorkerSlot ws : _slots) {
_slotToExecs.put(ws, new ArrayList<ExecutorDetails>());
}
}
}
public String getId() {
return _nodeId;
}
public String getHostname() {
return _hostname;
}
public Collection<WorkerSlot> getFreeSlots() {
return _freeSlots;
}
public boolean isAlive() {
return _isAlive;
}
/**
* @return a collection of the topology ids currently running on this node
*/
public Collection<String> getRunningTopologies() {
return _topIdToUsedSlots.keySet();
}
public boolean isTotallyFree() {
return _topIdToUsedSlots.isEmpty();
}
public int totalSlotsFree() {
return _freeSlots.size();
}
public int totalSlotsUsed() {
int total = 0;
for (Set<WorkerSlot> slots : _topIdToUsedSlots.values()) {
total += slots.size();
}
return total;
}
public int totalSlots() {
return totalSlotsFree() + totalSlotsUsed();
}
public int totalSlotsUsed(String topId) {
int total = 0;
Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
if (slots != null) {
total = slots.size();
}
return total;
}
private void validateSlot(WorkerSlot ws) {
if (!_nodeId.equals(ws.getNodeId())) {
throw new IllegalArgumentException(
"Trying to add a slot to the wrong node " + ws +
" is not a part of " + _nodeId);
}
}
private void addOrphanedSlot(WorkerSlot ws) {
if (_isAlive) {
throw new IllegalArgumentException("Orphaned Slots " +
"only are allowed on dead nodes.");
}
validateSlot(ws);
if (_freeSlots.contains(ws)) {
return;
}
for (Set<WorkerSlot> used : _topIdToUsedSlots.values()) {
if (used.contains(ws)) {
return;
}
}
_freeSlots.add(ws);
_slotToExecs.put(ws, new ArrayList<ExecutorDetails>());
}
boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) {
validateSlot(ws);
if (!_freeSlots.remove(ws)) {
if (dontThrow) {
return true;
}
throw new IllegalStateException("Assigning a slot that was not free " + ws);
}
Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
if (usedSlots == null) {
usedSlots = new HashSet<WorkerSlot>();
_topIdToUsedSlots.put(topId, usedSlots);
}
usedSlots.add(ws);
return false;
}
/**
* Free all slots on this node. This will update the Cluster too.
* @param cluster the cluster to be updated
*/
public void freeAllSlots(Cluster cluster) {
if (!_isAlive) {
LOG.warn("Freeing all slots on a dead node {} ", _nodeId);
}
for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
cluster.freeSlots(entry.getValue());
if (_isAlive) {
_freeSlots.addAll(entry.getValue());
}
}
_topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
}
/**
* Frees a single slot in this node
* @param ws the slot to free
* @param cluster the cluster to update
*/
public void free(WorkerSlot ws, Cluster cluster) {
if (_freeSlots.contains(ws)) return;
for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
Set<WorkerSlot> slots = entry.getValue();
if (slots.remove(ws)) {
cluster.freeSlot(ws);
if (_isAlive) {
_freeSlots.add(ws);
}
return;
}
}
throw new IllegalArgumentException("Tried to free a slot that was not" +
" part of this node " + _nodeId);
}
/**
* Frees all the slots for a topology.
* @param topId the topology to free slots for
* @param cluster the cluster to update
*/
public void freeTopology(String topId, Cluster cluster) {
Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
if (slots == null || slots.isEmpty()) {
return;
}
for (WorkerSlot ws : slots) {
cluster.freeSlot(ws);
if (_isAlive) {
_freeSlots.add(ws);
}
}
_topIdToUsedSlots.remove(topId);
}
/**
* Allocate Mem and CPU resources to the assigned slot for the topology's executors.
* @param td the TopologyDetails that the slot is assigned to.
* @param executors the executors to run in that slot.
* @param slot the slot to allocate resource to
*/
public void allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) {
double onHeapMem = 0.0;
double offHeapMem = 0.0;
double cpu = 0.0;
for (ExecutorDetails exec : executors) {
Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
if (onHeapMemForExec != null) {
onHeapMem += onHeapMemForExec;
}
Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
if (offHeapMemForExec != null) {
offHeapMem += offHeapMemForExec;
}
Double cpuForExec = td.getTotalCpuReqTask(exec);
if (cpuForExec != null) {
cpu += cpuForExec;
}
}
slot.allocateResource(onHeapMem, offHeapMem, cpu);
}
public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors,
Cluster cluster) {
if (!_isAlive) {
throw new IllegalStateException("Trying to adding to a dead node " + _nodeId);
}
if (_freeSlots.isEmpty()) {
throw new IllegalStateException("Trying to assign to a full node " + _nodeId);
}
if (executors.size() == 0) {
LOG.warn("Trying to assign nothing from " + td.getId() + " to " + _nodeId + " (Ignored)");
}
if (target == null) {
target = _freeSlots.iterator().next();
}
if (!_freeSlots.contains(target)) {
throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + _nodeId);
} else {
allocateResourceToSlot(td, executors, target);
cluster.assign(target, td.getId(), executors);
assignInternal(target, td.getId(), false);
}
}
/**
* Assign a free slot on the node to the following topology and executors.
* This will update the cluster too.
* @param td the TopologyDetails to assign a free slot to.
* @param executors the executors to run in that slot.
* @param cluster the cluster to be updated
*/
public void assign(TopologyDetails td, Collection<ExecutorDetails> executors,
Cluster cluster) {
this.assign(null, td, executors, cluster);
}
@Override
public boolean equals(Object other) {
if (other instanceof RAS_Node) {
return _nodeId.equals(((RAS_Node) other)._nodeId);
}
return false;
}
@Override
public int hashCode() {
return _nodeId.hashCode();
}
@Override
public String toString() {
return "{Node: " + _sup.getHost() + ", AvailMem: " + _availMemory.toString() + ", AvailCPU: " + _availCPU.toString() + "}";
}
public static int countSlotsUsed(String topId, Collection<RAS_Node> nodes) {
int total = 0;
for (RAS_Node n : nodes) {
total += n.totalSlotsUsed(topId);
}
return total;
}
public static int countSlotsUsed(Collection<RAS_Node> nodes) {
int total = 0;
for (RAS_Node n : nodes) {
total += n.totalSlotsUsed();
}
return total;
}
public static int countFreeSlotsAlive(Collection<RAS_Node> nodes) {
int total = 0;
for (RAS_Node n : nodes) {
if (n.isAlive()) {
total += n.totalSlotsFree();
}
}
return total;
}
public static int countTotalSlotsAlive(Collection<RAS_Node> nodes) {
int total = 0;
for (RAS_Node n : nodes) {
if (n.isAlive()) {
total += n.totalSlots();
}
}
return total;
}
public static Map<String, RAS_Node> getAllNodesFrom(Cluster cluster, Topologies topologies) {
Map<String, RAS_Node> nodeIdToNode = new HashMap<String, RAS_Node>();
for (SupervisorDetails sup : cluster.getSupervisors().values()) {
//Node ID and supervisor ID are the same.
String id = sup.getId();
boolean isAlive = !cluster.isBlackListed(id);
LOG.debug("Found a {} Node {} {}",
isAlive ? "living" : "dead", id, sup.getAllPorts());
LOG.debug("resources_mem: {}, resources_CPU: {}", sup.getTotalMemory(), sup.getTotalCPU());
nodeIdToNode.put(sup.getId(), new RAS_Node(id, sup.getAllPorts(), isAlive, sup));
}
for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments().entrySet()) {
String topId = entry.getValue().getTopologyId();
for (WorkerSlot workerSlot : entry.getValue().getSlots()) {
String id = workerSlot.getNodeId();
RAS_Node node = nodeIdToNode.get(id);
if (node == null) {
LOG.info("Found an assigned slot on a dead supervisor {} with executors {}",
workerSlot, getExecutors(workerSlot, cluster));
node = new RAS_Node(id, null, false, null);
nodeIdToNode.put(id, node);
}
if (!node.isAlive()) {
//The supervisor on the node down so add an orphaned slot to hold the unsupervised worker
node.addOrphanedSlot(workerSlot);
}
if (node.assignInternal(workerSlot, topId, true)) {
LOG.warn("Bad scheduling state, " + workerSlot + " assigned multiple workers, unassigning everything...");
node.free(workerSlot, cluster);
}
}
}
RAS_Node.updateAvailableResources(cluster, topologies, nodeIdToNode);
for (Map.Entry<String, SchedulerAssignment> entry : cluster
.getAssignments().entrySet()) {
for (Map.Entry<ExecutorDetails, WorkerSlot> exec : entry.getValue()
.getExecutorToSlot().entrySet()) {
ExecutorDetails ed = exec.getKey();
WorkerSlot ws = exec.getValue();
String node_id = ws.getNodeId();
if (nodeIdToNode.containsKey(node_id)) {
RAS_Node node = nodeIdToNode.get(node_id);
if (node._slotToExecs.containsKey(ws)) {
node._slotToExecs.get(ws).add(ed);
node._execs.add(ed);
} else {
LOG.info(
"ERROR: should have node {} should have worker: {}",
node_id, ed);
return null;
}
} else {
LOG.info("ERROR: should have node {}", node_id);
return null;
}
}
}
return nodeIdToNode;
}
//This function is only used for logging information
private static Collection<ExecutorDetails> getExecutors(WorkerSlot ws,
Cluster cluster) {
Collection<ExecutorDetails> retList = new ArrayList<ExecutorDetails>();
for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments()
.entrySet()) {
Map<ExecutorDetails, WorkerSlot> executorToSlot = entry.getValue()
.getExecutorToSlot();
for (Map.Entry<ExecutorDetails, WorkerSlot> execToSlot : executorToSlot
.entrySet()) {
WorkerSlot slot = execToSlot.getValue();
if (ws.getPort() == slot.getPort()
&& ws.getNodeId().equals(slot.getNodeId())) {
ExecutorDetails exec = execToSlot.getKey();
retList.add(exec);
}
}
}
return retList;
}
/**
* updates the available resources for every node in a cluster
* by recalculating memory requirements.
* @param cluster the cluster used in this calculation
* @param topologies container of all topologies
* @param nodeIdToNode a map between node id and node
*/
private static void updateAvailableResources(Cluster cluster,
Topologies topologies,
Map<String, RAS_Node> nodeIdToNode) {
//recompute memory
if (cluster.getAssignments().size() > 0) {
for (Entry<String, SchedulerAssignment> entry : cluster.getAssignments()
.entrySet()) {
Map<ExecutorDetails, WorkerSlot> executorToSlot = entry.getValue()
.getExecutorToSlot();
Map<ExecutorDetails, Double> topoMemoryResourceList = topologies.getById(entry.getKey()).getTotalMemoryResourceList();
if (topoMemoryResourceList == null || topoMemoryResourceList.size() == 0) {
continue;
}
for (Map.Entry<ExecutorDetails, WorkerSlot> execToSlot : executorToSlot
.entrySet()) {
WorkerSlot slot = execToSlot.getValue();
ExecutorDetails exec = execToSlot.getKey();
RAS_Node node = nodeIdToNode.get(slot.getNodeId());
if (!node.isAlive()) {
continue;
// We do not free the assigned slots (the orphaned slots) on the inactive supervisors
// The inactive node will be treated as a 0-resource node and not available for other unassigned workers
}
if (topoMemoryResourceList.containsKey(exec)) {
node.consumeResourcesforTask(exec, topologies.getById(entry.getKey()));
} else {
LOG.warn("Resource Req not found...Scheduling Task{} with memory requirement as on heap - {} and off heap - {} and CPU requirement as {}",
exec,
Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB,
Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
topologies.getById(entry.getKey()).addDefaultResforExec(exec);
node.consumeResourcesforTask(exec, topologies.getById(entry.getKey()));
}
}
}
} else {
for (RAS_Node n : nodeIdToNode.values()) {
n.setAvailableMemory(n.getAvailableMemoryResources());
}
}
}
/**
* Sets the Available Memory for a node
* @param amount the amount to set as available memory
*/
public void setAvailableMemory(Double amount) {
_availMemory = amount;
}
/**
* Gets the available memory resources for this node
* @return the available memory for this node
*/
public Double getAvailableMemoryResources() {
if (_availMemory == null) {
return 0.0;
}
return _availMemory;
}
/**
* Gets the total memory resources for this node
* @return the total memory for this node
*/
public Double getTotalMemoryResources() {
if (_sup != null && _sup.getTotalMemory() != null) {
return _sup.getTotalMemory();
} else {
return 0.0;
}
}
/**
* Consumes a certain amount of memory for this node
* @param amount is the amount memory to consume from this node
* @return the current available memory for this node after consumption
*/
public Double consumeMemory(Double amount) {
if (amount > _availMemory) {
LOG.error("Attempting to consume more memory than available! Needed: {}, we only have: {}", amount, _availMemory);
return null;
}
_availMemory = _availMemory - amount;
return _availMemory;
}
/**
* Gets the available cpu resources for this node
* @return the available cpu for this node
*/
public Double getAvailableCpuResources() {
if (_availCPU == null) {
return 0.0;
}
return _availCPU;
}
/**
* Gets the total cpu resources for this node
* @return the total cpu for this node
*/
public Double getTotalCpuResources() {
if (_sup != null && _sup.getTotalCPU() != null) {
return _sup.getTotalCPU();
} else {
return 0.0;
}
}
/**
* Consumes a certain amount of cpu for this node
* @param amount is the amount cpu to consume from this node
* @return the current available cpu for this node after consumption
*/
public Double consumeCPU(Double amount) {
if (amount > _availCPU) {
LOG.error("Attempting to consume more CPU than available! Needed: {}, we only have: {}", amount, _availCPU);
return null;
}
_availCPU = _availCPU - amount;
return _availCPU;
}
/**
* Consumes a certain amount of resources for a executor in a topology.
* @param exec is the executor that is consuming resources on this node
* @param topo the topology the executor is a part
*/
public void consumeResourcesforTask(ExecutorDetails exec, TopologyDetails topo) {
Double taskMemReq = topo.getTotalMemReqTask(exec);
Double taskCpuReq = topo.getTotalCpuReqTask(exec);
this.consumeCPU(taskCpuReq);
this.consumeMemory(taskMemReq);
}
}