blob: 8f15aa3184509d0b63ce75f1dcc0330b74cb3636 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.storm.scheduler.resource.strategies.scheduling;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.RasNode;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SchedulingSearcherState {
private static final Logger LOG = LoggerFactory.getLogger(SchedulingSearcherState.class);
final long startTimeMillis;
private final long maxEndTimeMs;
// A map of the worker to the components in the worker to be able to enforce constraints.
private final Map<WorkerSlot, Map<String, Integer>> workerCompAssignmentCnts;
private final boolean[] okToRemoveFromWorker;
// for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
private final Map<RasNode, Map<String, Integer>> nodeCompAssignmentCnts;
private final boolean[] okToRemoveFromNode;
// Static State
// The list of all executors (preferably sorted to make assignments simpler).
private List<ExecutorDetails> execs;
//The maximum number of state to search before stopping.
private final int maxStatesSearched;
//The topology we are scheduling
private final TopologyDetails td;
private final String topoName;
// Metrics
// How many states searched so far.
private int statesSearched = 0;
// Number of times we had to backtrack.
private int numBacktrack = 0;
// Current state
// The current executor we are trying to schedule
private int execIndex = 0;
private final Map<ExecutorDetails, String> execToComp;
public SchedulingSearcherState(Map<WorkerSlot, Map<String, Integer>> workerCompAssignmentCnts,
Map<RasNode, Map<String, Integer>> nodeCompAssignmentCnts, int maxStatesSearched, long maxTimeMs,
List<ExecutorDetails> execs, TopologyDetails td, Map<ExecutorDetails, String> execToComp) {
assert execs != null;
this.workerCompAssignmentCnts = workerCompAssignmentCnts;
this.nodeCompAssignmentCnts = nodeCompAssignmentCnts;
this.maxStatesSearched = maxStatesSearched;
this.execs = execs;
okToRemoveFromWorker = new boolean[execs.size()];
okToRemoveFromNode = new boolean[execs.size()]; = td;
this.topoName = td.getName();
startTimeMillis = Time.currentTimeMillis();
if (maxTimeMs <= 0) {
maxEndTimeMs = Long.MAX_VALUE;
} else {
maxEndTimeMs = startTimeMillis + maxTimeMs;
this.execToComp = execToComp;
* Reassign the list of executors as long as it contains the same executors as before.
* Executors are normally assigned when this class is instantiated. However, this
* list may be resorted externally and then reassigned.
* @param sortedExecs new list to be assigned.
public void setSortedExecs(List<ExecutorDetails> sortedExecs) {
if (execs == null || new HashSet<>(execs).equals(new HashSet<>(sortedExecs))) {
this.execs = sortedExecs;
} else {
String err = String.format("executors in sorted list (cnt=%d) are different from initial assignment (cnt=%d), topo=%s)",
sortedExecs.size(), execs.size(), topoName);
throw new IllegalArgumentException(err);
public void incStatesSearched() {
if (statesSearched % 1_000 == 0) {
LOG.debug("Topology {} States Searched: {}", topoName, statesSearched);
LOG.debug("Topology {} backtrack: {}", topoName, numBacktrack);
public long getStartTimeMillis() {
return startTimeMillis;
public int getStatesSearched() {
return statesSearched;
public int getExecSize() {
return execs.size();
public int getNumBacktrack() {
return numBacktrack;
public int getExecIndex() {
return execIndex;
public boolean areSearchLimitsExceeded() {
return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
public SchedulingSearcherState nextExecutor() {
if (execIndex >= execs.size()) {
String err = String.format("Internal Error: topology %s: execIndex exceeded limit %d >= %d", topoName, execIndex, execs.size());
throw new IllegalStateException(err);
return this;
public boolean areAllExecsScheduled() {
return execIndex == execs.size() - 1;
public ExecutorDetails currentExec() {
return execs.get(execIndex);
* Assign executor to worker and node.
* Assignment validity check is done before calling this method.
* @param execToComp Mapping from executor to component name.
* @param node RasNode on which to schedule.
* @param workerSlot WorkerSlot on which to schedule.
public void assignCurrentExecutor(Map<ExecutorDetails, String> execToComp, RasNode node, WorkerSlot workerSlot) {
ExecutorDetails exec = currentExec();
String comp = execToComp.get(exec);
LOG.trace("Topology {} Trying assignment of {} {} to {}", topoName, exec, comp, workerSlot);
// It is possible that this component is already scheduled on this node or worker. If so when we backtrack we cannot remove it
Map<String, Integer> oneMap = workerCompAssignmentCnts.computeIfAbsent(workerSlot, (k) -> new HashMap<>());
oneMap.put(comp, oneMap.getOrDefault(comp, 0) + 1); // increment assignment count
okToRemoveFromWorker[execIndex] = true;
oneMap = nodeCompAssignmentCnts.computeIfAbsent(node, (k) -> new HashMap<>());
oneMap.put(comp, oneMap.getOrDefault(comp, 0) + 1); // increment assignment count
okToRemoveFromNode[execIndex] = true;
node.assignSingleExecutor(workerSlot, exec, td);
public void backtrack(Map<ExecutorDetails, String> execToComp, RasNode node, WorkerSlot workerSlot) {
if (execIndex < 0) {
throw new IllegalStateException("Internal Error: Topology " + topoName + " exec index became negative");
ExecutorDetails exec = currentExec();
String comp = execToComp.get(exec);
LOG.trace("Topology {} Backtracking {} {} from {}", topoName, exec, comp, workerSlot);
if (okToRemoveFromWorker[execIndex]) {
Map<String, Integer> oneMap = workerCompAssignmentCnts.get(workerSlot);
oneMap.put(comp, oneMap.getOrDefault(comp, 0) - 1); // decrement assignment count
okToRemoveFromWorker[execIndex] = false;
if (okToRemoveFromNode[execIndex]) {
Map<String, Integer> oneMap = nodeCompAssignmentCnts.get(node);
oneMap.put(comp, oneMap.getOrDefault(comp, 0) - 1); // decrement assignment count
okToRemoveFromNode[execIndex] = false;
node.freeSingleExecutor(exec, td);
* Use this method to log the current component assignments on the Node.
* Useful for debugging and tests.
public void logNodeCompAssignments() {
if (nodeCompAssignmentCnts == null || nodeCompAssignmentCnts.isEmpty()) {"Topology {} NodeCompAssignment is empty", topoName);
StringBuffer sb = new StringBuffer();
int cntAllNodes = 0;
int cntFilledNodes = 0;
for (RasNode node: new TreeSet<>(nodeCompAssignmentCnts.keySet())) {
Map<String, Integer> oneMap = nodeCompAssignmentCnts.get(node);
if (oneMap.isEmpty()) {
String oneMapJoined = oneMap.entrySet()
.stream().map(e -> String.format("%s: %s", e.getKey(), e.getValue()))
sb.append(String.format("\n\t(%d) Node %s: %s", cntFilledNodes, node.getId(), oneMapJoined));
}"Topology {} NodeCompAssignments available for {} of {} nodes {}", topoName, cntFilledNodes, cntAllNodes, sb);"Topology {} Executors assignments attempted (cnt={}) are: \n\t{}",
topoName, execs.size(),","))
* Get a map of component to count for the specified worker slot.
* @param workerSlot to check for.
* @return assignment map of count for components, may be a null.
public Map<String, Integer> getCompAssignmentCntMapForWorker(WorkerSlot workerSlot) {
return workerCompAssignmentCnts.get(workerSlot);
public int getComponentCntOnNode(RasNode rasNode, String comp) {
Map<String, Integer> map = nodeCompAssignmentCnts.get(rasNode);
if (map == null) {
return 0;
return map.getOrDefault(comp, 0);
public SchedulingResult createSchedulingResult(boolean success, String schedulerClassSimpleName) {
String msg;
if (success) {
msg = String.format("Fully Scheduled by %s (%d states traversed in %d ms, backtracked %d times)",
schedulerClassSimpleName, this.getStatesSearched(),
Time.currentTimeMillis() - this.getStartTimeMillis(), this.getNumBacktrack());
return SchedulingResult.success(msg);
} else {
msg = String.format("Cannot schedule by %s (%d states traversed in %d ms, backtracked %d times, %d of %d executors scheduled)",
schedulerClassSimpleName, this.getStatesSearched(),
Time.currentTimeMillis() - this.getStartTimeMillis(), this.getNumBacktrack(),
this.getExecIndex(), this.getExecSize());
return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, msg);
* Check if the current executor has a different component from the previous one.
* This flag can be used as a quick way to check if the nodes should be sorted.
* @return true if first executor or if the component is same as previous executor. False other wise.
public boolean isExecCompDifferentFromPrior() {
if (execIndex == 0) {
return true;
// did the component change from prior executor
return execToComp.getOrDefault(execs.get(execIndex), "")
.equals(execToComp.getOrDefault(execs.get(execIndex - 1), ""));