blob: 465fda0c07bce31c9182fdf476a7fc72dc3a7ac1 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.storm.scheduler.resource.strategies.scheduling;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.RasNode;
import org.apache.storm.scheduler.resource.RasNodes;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.ExecSorterByConnectionCount;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.ExecSorterByProximity;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.IExecSorter;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.INodeSorter;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorter;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class BaseResourceAwareStrategy implements IStrategy {
private static final Logger LOG = LoggerFactory.getLogger(BaseResourceAwareStrategy.class);
* Different node sorting types available. Two of these are for backward compatibility.
* The last one (COMMON) is the new sorting type used across the board.
* Refer to {@link NodeSorter#NodeSorter(Cluster, TopologyDetails, NodeSortType)} for more details.
public enum NodeSortType {
GENERIC_RAS, // for deprecation, Used by GenericResourceAwareStrategyOld
DEFAULT_RAS, // for deprecation, Used by DefaultResourceAwareStrategyOld
COMMON // new and only node sorting type going forward
// instance variables from class instantiation
protected final boolean sortNodesForEachExecutor;
protected final NodeSortType nodeSortType;
// instance variable set by two IStrategy methods
protected Map<String, Object> config;
protected Cluster cluster;
protected TopologyDetails topologyDetails;
// Instance variables derived from Cluster.
protected RasNodes nodes;
private Map<String, List<String>> networkTopography;
private Map<String, List<RasNode>> hostnameToNodes;
// Instance variables derived from TopologyDetails
protected String topoName;
protected Map<String, Set<ExecutorDetails>> compToExecs;
protected Map<ExecutorDetails, String> execToComp;
protected boolean orderExecutorsByProximity;
private long maxSchedulingTimeMs;
// Instance variables from Cluster and TopologyDetails.
Set<ExecutorDetails> unassignedExecutors;
private int maxStateSearch;
protected SchedulingSearcherState searcherState;
protected IExecSorter execSorter;
protected INodeSorter nodeSorter;
public BaseResourceAwareStrategy() {
this(true, NodeSortType.COMMON);
* Initialize for the default implementation of schedule().
* @param sortNodesForEachExecutor Sort nodes before scheduling each executor.
* @param nodeSortType type of sorting to be applied to object resource collection {@link NodeSortType}.
public BaseResourceAwareStrategy(boolean sortNodesForEachExecutor, NodeSortType nodeSortType) {
this.sortNodesForEachExecutor = sortNodesForEachExecutor;
this.nodeSortType = nodeSortType;
public void prepare(Map<String, Object> config) {
this.config = config;
* Note that this method is not thread-safe.
* Several instance variables are generated from supplied
* parameters. In addition, the following instance variables are set to complete scheduling:
* <li>{@link #searcherState}</li>
* <li>{@link #execSorter} to sort executors</li>
* <li>{@link #nodeSorter} to sort nodes</li>
* <p>
* Scheduling consists of three main steps:
* <li>{@link #prepareForScheduling(Cluster, TopologyDetails)}</li>
* <li>{@link #checkSchedulingFeasibility()}, and</li>
* <li>{@link #scheduleExecutorsOnNodes(List, Iterable)}</li>
* </p><p>
* The executors and nodes are sorted in the order most conducive to scheduling for the strategy.
* Those interfaces may be overridden by subclasses using mutators:
* <li>{@link #setExecSorter(IExecSorter)} and</li>
* <li>{@link #setNodeSorter(INodeSorter)}</li>
* @param cluster on which executors will be scheduled.
* @param td the topology to schedule for.
* @return result of scheduling (success, failure, or null when interrupted).
public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
prepareForScheduling(cluster, td);
// early detection of success or failure
SchedulingResult earlyResult = checkSchedulingFeasibility();
if (earlyResult != null) {
return earlyResult;
LOG.debug("Topology {} {} Number of ExecutorsNeedScheduling: {}", topoName, topologyDetails.getId(), unassignedExecutors.size());
//order executors to be scheduled
List<ExecutorDetails> orderedExecutors = execSorter.sortExecutors(unassignedExecutors);
Iterable<String> sortedNodes = null;
if (!this.sortNodesForEachExecutor) {
sortedNodes = nodeSorter.sortAllNodes(null);
return scheduleExecutorsOnNodes(orderedExecutors, sortedNodes);
* Initialize instance variables as the first step in {@link #schedule(Cluster, TopologyDetails)}.
* This method may be extended by subclasses to initialize additional variables as in
* {@link ConstraintSolverStrategy#prepareForScheduling(Cluster, TopologyDetails)}.
* @param cluster on which executors will be scheduled.
* @param topologyDetails to be scheduled.
protected void prepareForScheduling(Cluster cluster, TopologyDetails topologyDetails) {
this.cluster = cluster;
this.topologyDetails = topologyDetails;
// from Cluster
this.nodes = new RasNodes(cluster);
networkTopography = cluster.getNetworkTopography();
hostnameToNodes = this.nodes.getHostnameToNodes();
// from TopologyDetails
topoName = topologyDetails.getName();
execToComp = topologyDetails.getExecutorToComponent();
compToExecs = topologyDetails.getComponentToExecutors();
Map<String, Object> topoConf = topologyDetails.getConf();
orderExecutorsByProximity = isOrderByProximity(topoConf);
maxSchedulingTimeMs = computeMaxSchedulingTimeMs(topoConf);
// From Cluster and TopologyDetails - and cleaned-up
unassignedExecutors = Collections.unmodifiableSet(new HashSet<>(cluster.getUnassignedExecutors(topologyDetails)));
int confMaxStateSearch = getMaxStateSearchFromTopoConf(topologyDetails.getConf());
int daemonMaxStateSearch = ObjectReader.getInt(cluster.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH));
maxStateSearch = Math.min(daemonMaxStateSearch, confMaxStateSearch);
LOG.debug("The max state search configured by topology {} is {}", topologyDetails.getId(), confMaxStateSearch);
LOG.debug("The max state search that will be used by topology {} is {}", topologyDetails.getId(), maxStateSearch);
searcherState = createSearcherState();
setNodeSorter(new NodeSorter(cluster, topologyDetails, nodeSortType));
? new ExecSorterByProximity(topologyDetails)
: new ExecSorterByConnectionCount(topologyDetails));
* Set the pluggable sorter for ExecutorDetails.
* @param execSorter to use for sorting executorDetails when scheduling.
protected void setExecSorter(IExecSorter execSorter) {
this.execSorter = execSorter;
* Set the pluggable sorter for Nodes.
* @param nodeSorter to use for sorting nodes when scheduling.
protected void setNodeSorter(INodeSorter nodeSorter) {
this.nodeSorter = nodeSorter;
private static long computeMaxSchedulingTimeMs(Map<String, Object> topoConf) {
// expect to be killed by DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY seconds, terminate slightly before
int daemonMaxTimeSec = ObjectReader.getInt(topoConf.get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 60);
int confMaxTimeSec = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), daemonMaxTimeSec);
return (confMaxTimeSec >= daemonMaxTimeSec) ? daemonMaxTimeSec * 1000L - 200L : confMaxTimeSec * 1000L;
public static int getMaxStateSearchFromTopoConf(Map<String, Object> topoConf) {
int confMaxStateSearch;
if (topoConf.containsKey(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH)) {
//this config is always set for topologies of 2.0 or newer versions since it is in defaults.yaml file
//topologies of older versions can also use it if configures it explicitly
confMaxStateSearch = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH));
} else {
// For backwards compatibility
confMaxStateSearch = 10_000;
return confMaxStateSearch;
public static boolean isOrderByProximity(Map<String, Object> topoConf) {
return ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS), false);
* Create an instance of {@link SchedulingSearcherState}. This method is called by
* {@link #prepareForScheduling(Cluster, TopologyDetails)} and depends on variables initialized therein prior.
* @return a new instance of {@link SchedulingSearcherState}.
private SchedulingSearcherState createSearcherState() {
Map<WorkerSlot, Map<String, Integer>> workerCompCnts = new HashMap<>();
Map<RasNode, Map<String, Integer>> nodeCompCnts = new HashMap<>();
//populate with existing assignments
SchedulerAssignment existingAssignment = cluster.getAssignmentById(topologyDetails.getId());
if (existingAssignment != null) {
existingAssignment.getExecutorToSlot().forEach((exec, ws) -> {
String compId = execToComp.get(exec);
RasNode node = nodes.getNodeById(ws.getNodeId());
Map<String, Integer> compCnts = nodeCompCnts.computeIfAbsent(node, (k) -> new HashMap<>());
compCnts.put(compId, compCnts.getOrDefault(compId, 0) + 1); // increment
//populate worker to comp assignments
compCnts = workerCompCnts.computeIfAbsent(ws, (k) -> new HashMap<>());
compCnts.put(compId, compCnts.getOrDefault(compId, 0) + 1); // increment
return new SchedulingSearcherState(workerCompCnts, nodeCompCnts,
maxStateSearch, maxSchedulingTimeMs, new ArrayList<>(unassignedExecutors), topologyDetails, execToComp);
* Check scheduling feasibility for a quick failure as the second step in {@link #schedule(Cluster, TopologyDetails)}.
* If scheduling is not possible, then return a SchedulingStatus object with a failure status.
* If fully scheduled then return a successful SchedulingStatus.
* This method can be extended by subclasses {@link ConstraintSolverStrategy#checkSchedulingFeasibility()}
* to check for additional failure conditions.
* @return A non-null {@link SchedulingResult} to terminate scheduling, otherwise return null to continue scheduling.
protected SchedulingResult checkSchedulingFeasibility() {
if (unassignedExecutors.isEmpty()) {
return SchedulingResult.success("Fully Scheduled by " + this.getClass().getSimpleName());
String err;
if (nodes.getNodes().size() <= 0) {
err = "No available nodes to schedule tasks on!";
LOG.warn("Topology {}:{}", topoName, err);
return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, err);
if (!topologyDetails.hasSpouts()) {
err = "Cannot find a Spout!";
LOG.error("Topology {}:{}", topoName, err);
return SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, err);
int execCnt = unassignedExecutors.size();
if (execCnt >= maxStateSearch) {
err = String.format("Unassignerd Executor count (%d) is greater than searchable state count %d", execCnt, maxStateSearch);
LOG.error("Topology {}:{}", topoName, err);
return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, err);
return null;
* Check if the assignment of the executor to the worker is valid. In simple cases,
* this is simply a check of {@link RasNode#wouldFit(WorkerSlot, ExecutorDetails, TopologyDetails)}.
* This method may be extended by subclasses to add additional checks,
* see {@link ConstraintSolverStrategy#isExecAssignmentToWorkerValid(ExecutorDetails, WorkerSlot)}.
* @param exec being scheduled.
* @param worker on which to schedule.
* @return true if executor can be assigned to the worker, false otherwise.
protected boolean isExecAssignmentToWorkerValid(ExecutorDetails exec, WorkerSlot worker) {
//check resources
RasNode node = nodes.getNodeById(worker.getNodeId());
if (!node.wouldFit(worker, exec, topologyDetails)) {
LOG.trace("Topology {}, executor {} would not fit in resources available on worker {}", topoName, exec, worker);
return false;
return true;
* Log a bunch of stuff for debugging.
private void logClusterInfo() {
if (LOG.isDebugEnabled()) {
for (Map.Entry<String, List<String>> clusterEntry : networkTopography.entrySet()) {
String rackId = clusterEntry.getKey();
LOG.debug("Rack: {}", rackId);
for (String nodeHostname : clusterEntry.getValue()) {
for (RasNode node : hostnameToNodes(nodeHostname)) {
LOG.debug("-> Node: {} {}", node.getHostname(), node.getId());
"--> Avail Resources: {Mem {}, CPU {} Slots: {}}",
"--> Total Resources: {Mem {}, CPU {} Slots: {}}",
* hostname to Ids.
* @param hostname the hostname.
* @return the ids n that node.
public List<RasNode> hostnameToNodes(String hostname) {
return hostnameToNodes.getOrDefault(hostname, Collections.emptyList());
* Find RASNode for specified node id.
* @param id the node/supervisor id to lookup
* @return a RASNode object
public RasNode idToNode(String id) {
RasNode ret = nodes.getNodeById(id);
if (ret == null) {
LOG.error("Cannot find Node with Id: {}", id);
return ret;
* Try to schedule till successful or till limits (backtrack count or time) have been exceeded.
* @param orderedExecutors Executors sorted in the preferred order cannot be null.
* @param sortedNodesIter Node iterable which may be null.
* @return SchedulingResult with success attribute set to true or false indicting whether ALL executors were assigned.
protected SchedulingResult scheduleExecutorsOnNodes(List<ExecutorDetails> orderedExecutors, Iterable<String> sortedNodesIter) {
long startTimeMilli = System.currentTimeMillis();
int maxExecCnt = searcherState.getExecSize();
// following three are state information at each "execIndex" level
int progressIdx = -1;
int[] progressIdxForExec = new int[maxExecCnt];
RasNode[] nodeForExec = new RasNode[maxExecCnt];
WorkerSlot[] workerSlotForExec = new WorkerSlot[maxExecCnt];
for (int i = 0; i < maxExecCnt ; i++) {
progressIdxForExec[i] = -1;
}"scheduleExecutorsOnNodes: will assign {} executors for topo {}", maxExecCnt, topoName);
for (int loopCnt = 0 ; true ; loopCnt++) {
LOG.debug("scheduleExecutorsOnNodes: loopCnt={}, execIndex={}, topo={}", loopCnt, searcherState.getExecIndex(), topoName);
if (searcherState.areSearchLimitsExceeded()) {
LOG.warn("Limits exceeded, backtrackCnt={}, loopCnt={}, topo={}", searcherState.getNumBacktrack(), loopCnt, topoName);
return searcherState.createSchedulingResult(false, this.getClass().getSimpleName());
if (Thread.currentThread().isInterrupted()) {
return searcherState.createSchedulingResult(false, this.getClass().getSimpleName());
int execIndex = searcherState.getExecIndex();
ExecutorDetails exec = searcherState.currentExec();
String comp = execToComp.get(exec);
if (sortedNodesIter == null || (this.sortNodesForEachExecutor && searcherState.isExecCompDifferentFromPrior())) {
progressIdx = -1;
sortedNodesIter = nodeSorter.sortAllNodes(exec);
for (String nodeId : sortedNodesIter) {
RasNode node = nodes.getNodeById(nodeId);
if (!node.couldEverFit(exec, topologyDetails)) {
for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
if (progressIdx <= progressIdxForExec[execIndex]) {
if (!isExecAssignmentToWorkerValid(exec, workerSlot)) {
searcherState.assignCurrentExecutor(execToComp, node, workerSlot);
if (searcherState.areAllExecsScheduled()) {
//Everything is scheduled correctly, so no need to search any more."scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, backtrackCnt={}, topo={}",
loopCnt, System.currentTimeMillis() - startTimeMilli,
Time.currentTimeMillis() - searcherState.startTimeMillis,
return searcherState.createSchedulingResult(true, this.getClass().getSimpleName());
searcherState = searcherState.nextExecutor();
nodeForExec[execIndex] = node;
workerSlotForExec[execIndex] = workerSlot;
LOG.debug("scheduleExecutorsOnNodes: Assigned execId={}, comp={} to node={}, slot-ordinal={} at loopCnt={}, topo={}",
execIndex, comp, nodeId, progressIdx, loopCnt, topoName);
sortedNodesIter = null;
// if here, then the executor was not assigned, backtrack;
LOG.debug("scheduleExecutorsOnNodes: Failed to schedule execId={}, comp={} at loopCnt={}, topo={}",
execIndex, comp, loopCnt, topoName);
if (execIndex == 0) {
} else {
searcherState.backtrack(execToComp, nodeForExec[execIndex - 1], workerSlotForExec[execIndex - 1]);
progressIdxForExec[execIndex] = -1;
boolean success = searcherState.areAllExecsScheduled();"scheduleExecutorsOnNodes: Scheduled={} in {} milliseconds, state.elapsedtime={}, backtrackCnt={}, topo={}",
success, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - searcherState.startTimeMillis,
return searcherState.createSchedulingResult(success, this.getClass().getSimpleName());