blob: ddea221a313596a1696fcb5d5f325bc5eb6419f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.scheduler.resource;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents a single node in the cluster.
*/
public class RAS_Node {
private static final Logger LOG = LoggerFactory.getLogger(RAS_Node.class);
private final String nodeId;
private final Cluster cluster;
private final Set<WorkerSlot> originallyFreeSlots;
//A map consisting of all workers on the node.
//The key of the map is the worker id and the value is the corresponding workerslot object
private Map<String, WorkerSlot> slots = new HashMap<>();
// A map describing which topologies are using which slots on this node. The format of the map is the following:
// {TopologyId -> {WorkerId -> {Executors}}}
private Map<String, Map<String, Collection<ExecutorDetails>>> topIdToUsedSlots = new HashMap<>();
private String hostname;
private boolean isAlive;
private SupervisorDetails sup;
private boolean loggedUnderageUsage = false;
/**
* Create a new node.
* @param nodeId the id of the node.
* @param sup the supervisor this is for.
* @param cluster the cluster this is a part of.
* @param workerIdToWorker the mapping of slots already assigned to this node.
* @param assignmentMap the mapping of executors already assigned to this node.
*/
public RAS_Node(
String nodeId,
SupervisorDetails sup,
Cluster cluster,
Map<String, WorkerSlot> workerIdToWorker,
Map<String, Map<String, Collection<ExecutorDetails>>> assignmentMap) {
//Node ID and supervisor ID are the same.
this.nodeId = nodeId;
if (sup == null) {
isAlive = false;
} else {
isAlive = !cluster.isBlackListed(this.nodeId);
}
this.cluster = cluster;
// initialize slots for this node
if (workerIdToWorker != null) {
slots = workerIdToWorker;
}
//initialize assignment map
if (assignmentMap != null) {
topIdToUsedSlots = assignmentMap;
}
//check if node is alive
if (isAlive && sup != null) {
hostname = sup.getHost();
this.sup = sup;
}
HashSet<String> freeById = new HashSet<>(slots.keySet());
if (assignmentMap != null) {
for (Map<String, Collection<ExecutorDetails>> assignment : assignmentMap.values()) {
freeById.removeAll(assignment.keySet());
}
}
originallyFreeSlots = new HashSet<>();
for (WorkerSlot slot : slots.values()) {
if (freeById.contains(slot.getId())) {
originallyFreeSlots.add(slot);
}
}
}
public String getId() {
return nodeId;
}
public String getHostname() {
return hostname;
}
private Collection<WorkerSlot> workerIdsToWorkers(Collection<String> workerIds) {
Collection<WorkerSlot> ret = new LinkedList<>();
for (String workerId : workerIds) {
ret.add(slots.get(workerId));
}
return ret;
}
/**
* Get the IDs of all free slots on this node.
* @return the ids of the free slots.
*/
public Collection<String> getFreeSlotsId() {
if (!isAlive) {
return new HashSet<>();
}
Set<String> ret = new HashSet<>(slots.keySet());
ret.removeAll(getUsedSlotsId());
return ret;
}
public Collection<WorkerSlot> getSlotsAvailableToScheduleOn() {
return originallyFreeSlots;
}
public Collection<WorkerSlot> getFreeSlots() {
return workerIdsToWorkers(getFreeSlotsId());
}
private Collection<String> getUsedSlotsId() {
Collection<String> ret = new LinkedList<>();
for (Map<String, Collection<ExecutorDetails>> entry : topIdToUsedSlots.values()) {
ret.addAll(entry.keySet());
}
return ret;
}
public Collection<WorkerSlot> getUsedSlots() {
return workerIdsToWorkers(getUsedSlotsId());
}
/**
* Get slots used by the given topology.
* @param topId the id of the topology to get.
* @return the slots currently assigned to that topology on this node.
*/
public Collection<WorkerSlot> getUsedSlots(String topId) {
if (topIdToUsedSlots.get(topId) != null) {
return workerIdsToWorkers(topIdToUsedSlots.get(topId).keySet());
} else {
return Collections.emptySet();
}
}
public boolean isAlive() {
return isAlive;
}
/**
* Get a collection of the topology ids currently running on this node.
*/
public Collection<String> getRunningTopologies() {
return topIdToUsedSlots.keySet();
}
public boolean isTotallyFree() {
return getUsedSlots().isEmpty();
}
public int totalSlotsFree() {
return getFreeSlots().size();
}
public int totalSlotsUsed() {
return getUsedSlots().size();
}
public int totalSlotsUsed(String topId) {
return getUsedSlots(topId).size();
}
public int totalSlots() {
return slots.size();
}
/**
* Free all slots on this node. This will update the Cluster too.
*/
public void freeAllSlots() {
if (!isAlive) {
LOG.warn("Freeing all slots on a dead node {} ", nodeId);
}
cluster.freeSlots(slots.values());
//clearing assignments
topIdToUsedSlots.clear();
}
/**
* frees a single executor.
*
* @param exec is the executor to free
* @param topo the topology the executor is a part of
*/
public void freeSingleExecutor(ExecutorDetails exec, TopologyDetails topo) {
Map<String, Collection<ExecutorDetails>> usedSlots = topIdToUsedSlots.get(topo.getId());
if (usedSlots == null) {
throw new IllegalArgumentException("Topology " + topo + " is not assigned");
}
WorkerSlot ws = null;
Set<ExecutorDetails> updatedAssignment = new HashSet<>();
for (Entry<String, Collection<ExecutorDetails>> entry : usedSlots.entrySet()) {
if (entry.getValue().contains(exec)) {
ws = slots.get(entry.getKey());
updatedAssignment.addAll(entry.getValue());
updatedAssignment.remove(exec);
break;
}
}
if (ws == null) {
throw new IllegalArgumentException(
"Executor " + exec + " is not assinged on this node to " + topo);
}
free(ws);
if (!updatedAssignment.isEmpty()) {
assign(ws, topo, updatedAssignment);
}
}
/**
* Frees a single slot in this node.
*
* @param ws the slot to free
*/
public void free(WorkerSlot ws) {
LOG.debug("freeing WorkerSlot {} on node {}", ws, hostname);
if (!slots.containsKey(ws.getId())) {
throw new IllegalArgumentException(
"Tried to free a slot " + ws + " that was not" + " part of this node " + nodeId);
}
TopologyDetails topo = findTopologyUsingWorker(ws);
if (topo == null) {
throw new IllegalArgumentException("Tried to free a slot " + ws + " that was already free!");
}
//free slot
cluster.freeSlot(ws);
//cleanup internal assignments
topIdToUsedSlots.get(topo.getId()).remove(ws.getId());
}
/**
* Find a which topology is running on a worker slot.
*
* @return the topology using the worker slot. If worker slot is free then return null
*/
private TopologyDetails findTopologyUsingWorker(WorkerSlot ws) {
for (Entry<String, Map<String, Collection<ExecutorDetails>>> entry :
topIdToUsedSlots.entrySet()) {
String topoId = entry.getKey();
Set<String> workerIds = entry.getValue().keySet();
for (String workerId : workerIds) {
if (ws.getId().equals(workerId)) {
return cluster.getTopologies().getById(topoId);
}
}
}
return null;
}
/**
* Assigns a worker to a node.
*
* @param target the worker slot to assign the executors
* @param td the topology the executors are from
* @param executors executors to assign to the specified worker slot
*/
public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors) {
if (!isAlive) {
throw new IllegalStateException("Trying to adding to a dead node " + nodeId);
}
Collection<WorkerSlot> freeSlots = getFreeSlots();
if (freeSlots.isEmpty()) {
throw new IllegalStateException("Trying to assign to a full node " + nodeId);
}
if (executors.size() == 0) {
LOG.warn("Trying to assign nothing from " + td.getId() + " to " + nodeId + " (Ignored)");
}
if (target == null) {
target = getFreeSlots().iterator().next();
}
if (!freeSlots.contains(target)) {
throw new IllegalStateException(
"Trying to assign already used slot " + target.getPort() + " on node " + nodeId);
}
LOG.debug("target slot: {}", target);
cluster.assign(target, td.getId(), executors);
//assigning internally
topIdToUsedSlots.computeIfAbsent(td.getId(), (tid) -> new HashMap<>())
.computeIfAbsent(target.getId(), (tid) -> new LinkedList<>())
.addAll(executors);
}
/**
* Assign a single executor to a slot, even if other things are in the slot.
* @param ws the slot to assign it to.
* @param exec the executor to assign.
* @param td the topology for the executor.
*/
public void assignSingleExecutor(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
if (!isAlive) {
throw new IllegalStateException("Trying to adding to a dead node " + nodeId);
}
Collection<WorkerSlot> freeSlots = getFreeSlots();
Set<ExecutorDetails> toAssign = new HashSet<>();
toAssign.add(exec);
if (!freeSlots.contains(ws)) {
Map<String, Collection<ExecutorDetails>> usedSlots = topIdToUsedSlots.get(td.getId());
if (usedSlots == null) {
throw new IllegalArgumentException(
"Slot " + ws + " is not availble to schedue " + exec + " on");
}
Collection<ExecutorDetails> alreadyHere = usedSlots.get(ws.getId());
if (alreadyHere == null) {
throw new IllegalArgumentException(
"Slot " + ws + " is not availble to schedue " + exec + " on");
}
toAssign.addAll(alreadyHere);
free(ws);
}
assign(ws, td, toAssign);
}
/**
* Would scheduling exec in ws fit with the current resource constraints.
*
* @param ws the slot to possibly put exec in
* @param exec the executor to possibly place in ws
* @param td the topology exec is a part of
* @return true if it would fit else false
*/
public boolean wouldFit(WorkerSlot ws, ExecutorDetails exec, TopologyDetails td) {
assert nodeId.equals(ws.getNodeId()) : "Slot " + ws + " is not a part of this node " + nodeId;
return isAlive
&& cluster.wouldFit(
ws,
exec,
td,
getTotalAvailableResources(),
td.getTopologyWorkerMaxHeapSize()
);
}
/**
* Is there any possibility that exec could ever fit on this node.
* @param exec the executor to schedule
* @param td the topology the executor is a part of
* @return true if there is the possibility it might fit, no guarantee that it will, or false if there is no
* way it would ever fit.
*/
public boolean couldEverFit(ExecutorDetails exec, TopologyDetails td) {
if (!isAlive) {
return false;
}
NormalizedResourceOffer avail = getTotalAvailableResources();
NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
return avail.couldFit(cluster.getMinWorkerCpu(), requestedResources);
}
@Override
public boolean equals(Object other) {
if (other instanceof RAS_Node) {
return nodeId.equals(((RAS_Node) other).nodeId);
}
return false;
}
@Override
public int hashCode() {
return nodeId.hashCode();
}
@Override
public String toString() {
return "{Node: "
+ ((sup == null) ? "null (possibly down)" : sup.getHost())
+ ", Avail [ Mem: "
+ getAvailableMemoryResources()
+ ", CPU: "
+ getAvailableCpuResources()
+ ", Slots: "
+ this.getFreeSlots()
+ "] Total [ Mem: "
+ ((sup == null) ? "N/A" : this.getTotalMemoryResources())
+ ", CPU: "
+ ((sup == null) ? "N/A" : this.getTotalCpuResources())
+ ", Slots: "
+ this.slots.values()
+ " ]}";
}
/**
* Gets the available memory resources for this node.
*
* @return the available memory for this node
*/
public double getAvailableMemoryResources() {
return getTotalAvailableResources().getTotalMemoryMb();
}
/**
* Gets total resources for this node.
*/
public NormalizedResourceOffer getTotalResources() {
if (sup != null) {
return sup.getTotalResources();
} else {
return new NormalizedResourceOffer();
}
}
/**
* Gets all available resources for this node.
*
* @return All of the available resources.
*/
public NormalizedResourceOffer getTotalAvailableResources() {
if (sup != null) {
NormalizedResourceOffer availableResources = new NormalizedResourceOffer(sup.getTotalResources());
if (availableResources.remove(cluster.getAllScheduledResourcesForNode(sup.getId()), cluster.getResourceMetrics())) {
if (!loggedUnderageUsage) {
LOG.error("Resources on {} became negative and was clamped to 0 {}.", hostname, availableResources);
loggedUnderageUsage = true;
}
}
return availableResources;
} else {
return new NormalizedResourceOffer();
}
}
/**
* Gets the total memory resources for this node.
*
* @return the total memory for this node
*/
public double getTotalMemoryResources() {
if (sup != null) {
return sup.getTotalMemory();
} else {
return 0.0;
}
}
/**
* Gets the available cpu resources for this node.
*
* @return the available cpu for this node
*/
public double getAvailableCpuResources() {
return getTotalAvailableResources().getTotalCpu();
}
/**
* Gets the total cpu resources for this node.
*
* @return the total cpu for this node
*/
public double getTotalCpuResources() {
if (sup != null) {
return sup.getTotalCpu();
} else {
return 0.0;
}
}
}