blob: cacba7a722502b395de3a770b08c21e8a7531a39 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.scheduler.resource.strategies.scheduling;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.RasNode;
import org.apache.storm.scheduler.resource.RasNodes;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
//hard coded max number of states to search
private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
//constraints and spreads
private Map<String, Map<String, Integer>> constraintMatrix;
private HashSet<String> spreadComps = new HashSet<>();
private Map<String, RasNode> nodes;
private Map<ExecutorDetails, String> execToComp;
private Map<String, Set<ExecutorDetails>> compToExecs;
private List<String> favoredNodeIds;
private List<String> unFavoredNodeIds;
static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) {
Map<String, Map<String, Integer>> matrix = new HashMap<>();
for (String comp : comps) {
matrix.put(comp, new HashMap<>());
for (String comp2 : comps) {
matrix.get(comp).put(comp2, 0);
}
}
List<List<String>> constraints = (List<List<String>>) topo.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINTS);
if (constraints != null) {
for (List<String> constraintPair : constraints) {
String comp1 = constraintPair.get(0);
String comp2 = constraintPair.get(1);
if (!matrix.containsKey(comp1)) {
LOG.warn("Comp: {} declared in constraints is not valid!", comp1);
continue;
}
if (!matrix.containsKey(comp2)) {
LOG.warn("Comp: {} declared in constraints is not valid!", comp2);
continue;
}
matrix.get(comp1).put(comp2, 1);
matrix.get(comp2).put(comp1, 1);
}
}
return matrix;
}
/**
* Determines if a scheduling is valid and all constraints are satisfied.
*/
@VisibleForTesting
public static boolean validateSolution(Cluster cluster, TopologyDetails td) {
return checkSpreadSchedulingValid(cluster, td)
&& checkConstraintsSatisfied(cluster, td)
&& checkResourcesCorrect(cluster, td);
}
/**
* Check if constraints are satisfied.
*/
private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo) {
LOG.info("Checking constraints...");
assert (cluster.getAssignmentById(topo.getId()) != null);
Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
//get topology constraints
Map<String, Map<String, Integer>> constraintMatrix = getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values()));
Map<WorkerSlot, Set<String>> workerCompMap = new HashMap<>();
result.forEach((exec, worker) -> {
String comp = execToComp.get(exec);
workerCompMap.computeIfAbsent(worker, (k) -> new HashSet<>()).add(comp);
});
for (Map.Entry<WorkerSlot, Set<String>> entry : workerCompMap.entrySet()) {
Set<String> comps = entry.getValue();
for (String comp1 : comps) {
for (String comp2 : comps) {
if (!comp1.equals(comp2) && constraintMatrix.get(comp1).get(comp2) != 0) {
LOG.error("Incorrect Scheduling: worker exclusion for Component {} and {} not satisfied on WorkerSlot: {}",
comp1, comp2, entry.getKey());
return false;
}
}
}
}
return true;
}
private static Map<WorkerSlot, RasNode> workerToNodes(Cluster cluster) {
Map<WorkerSlot, RasNode> workerToNodes = new HashMap<>();
for (RasNode node : RasNodes.getAllNodesFrom(cluster).values()) {
for (WorkerSlot s : node.getUsedSlots()) {
workerToNodes.put(s, node);
}
}
return workerToNodes;
}
private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo) {
LOG.info("Checking for a valid scheduling...");
assert (cluster.getAssignmentById(topo.getId()) != null);
Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new HashMap<>();
Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>();
Map<RasNode, HashSet<String>> nodeCompMap = new HashMap<>();
Map<WorkerSlot, RasNode> workerToNodes = workerToNodes(cluster);
boolean ret = true;
HashSet<String> spreadComps = getSpreadComps(topo);
for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) {
ExecutorDetails exec = entry.getKey();
WorkerSlot worker = entry.getValue();
RasNode node = workerToNodes.get(worker);
if (workerExecMap.computeIfAbsent(worker, (k) -> new HashSet<>()).contains(exec)) {
LOG.error("Incorrect Scheduling: Found duplicate in scheduling");
return false;
}
workerExecMap.get(worker).add(exec);
String comp = execToComp.get(exec);
workerCompMap.computeIfAbsent(worker, (k) -> new HashSet<>()).add(comp);
if (spreadComps.contains(comp)) {
if (nodeCompMap.computeIfAbsent(node, (k) -> new HashSet<>()).contains(comp)) {
LOG.error("Incorrect Scheduling: Spread for Component: {} {} on node {} not satisfied {}",
comp, exec, node.getId(), nodeCompMap.get(node));
ret = false;
}
}
nodeCompMap.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp);
}
return ret;
}
/**
* Check if resource constraints satisfied.
*/
private static boolean checkResourcesCorrect(Cluster cluster, TopologyDetails topo) {
LOG.info("Checking Resources...");
assert (cluster.getAssignmentById(topo.getId()) != null);
Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
Map<RasNode, Collection<ExecutorDetails>> nodeToExecs = new HashMap<>();
Map<ExecutorDetails, WorkerSlot> mergedExecToWorker = new HashMap<>();
Map<String, RasNode> nodes = RasNodes.getAllNodesFrom(cluster);
//merge with existing assignments
if (cluster.getAssignmentById(topo.getId()) != null
&& cluster.getAssignmentById(topo.getId()).getExecutorToSlot() != null) {
mergedExecToWorker.putAll(cluster.getAssignmentById(topo.getId()).getExecutorToSlot());
}
mergedExecToWorker.putAll(result);
for (Map.Entry<ExecutorDetails, WorkerSlot> entry : mergedExecToWorker.entrySet()) {
ExecutorDetails exec = entry.getKey();
WorkerSlot worker = entry.getValue();
RasNode node = nodes.get(worker.getNodeId());
if (node.getAvailableMemoryResources() < 0.0 && node.getAvailableCpuResources() < 0.0) {
LOG.error("Incorrect Scheduling: found node with negative available resources");
return false;
}
nodeToExecs.computeIfAbsent(node, (k) -> new HashSet<>()).add(exec);
}
for (Map.Entry<RasNode, Collection<ExecutorDetails>> entry : nodeToExecs.entrySet()) {
RasNode node = entry.getKey();
Collection<ExecutorDetails> execs = entry.getValue();
double cpuUsed = 0.0;
double memoryUsed = 0.0;
for (ExecutorDetails exec : execs) {
cpuUsed += topo.getTotalCpuReqTask(exec);
memoryUsed += topo.getTotalMemReqTask(exec);
}
if (node.getAvailableCpuResources() != (node.getTotalCpuResources() - cpuUsed)) {
LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of cpu. Expected: {}"
+ " Actual: {} Executors scheduled on node: {}",
node.getId(), (node.getTotalCpuResources() - cpuUsed), node.getAvailableCpuResources(), execs);
return false;
}
if (node.getAvailableMemoryResources() != (node.getTotalMemoryResources() - memoryUsed)) {
LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of memory. Expected: {}"
+ " Actual: {} Executors scheduled on node: {}",
node.getId(), (node.getTotalMemoryResources() - memoryUsed), node.getAvailableMemoryResources(), execs);
return false;
}
}
return true;
}
private static HashSet<String> getSpreadComps(TopologyDetails topo) {
HashSet<String> retSet = new HashSet<>();
List<String> spread = (List<String>) topo.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS);
if (spread != null) {
Set<String> comps = topo.getComponents().keySet();
for (String comp : spread) {
if (comps.contains(comp)) {
retSet.add(comp);
} else {
LOG.warn("Comp {} declared for spread not valid", comp);
}
}
}
return retSet;
}
@Override
public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
prepare(cluster);
LOG.debug("Scheduling {}", td.getId());
nodes = RasNodes.getAllNodesFrom(cluster);
Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
Map<RasNode, Set<String>> nodeCompAssignment = new HashMap<>();
int confMaxStateSearch = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH));
int daemonMaxStateSearch = ObjectReader.getInt(cluster.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH));
final int maxStateSearch = Math.min(daemonMaxStateSearch, confMaxStateSearch);
final long maxTimeMs = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1) * 1000L;
favoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
unFavoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
//get mapping of execs to components
execToComp = td.getExecutorToComponent();
//get mapping of components to executors
compToExecs = getCompToExecs(execToComp);
//get topology constraints
constraintMatrix = getConstraintMap(td, compToExecs.keySet());
//get spread components
spreadComps = getSpreadComps(td);
//get a sorted list of unassigned executors based on number of constraints
Set<ExecutorDetails> unassignedExecutors = new HashSet<>(cluster.getUnassignedExecutors(td));
List<ExecutorDetails> sortedExecs = getSortedExecs(spreadComps, constraintMatrix, compToExecs).stream()
.filter(unassignedExecutors::contains)
.collect(Collectors.toList());
//populate with existing assignments
SchedulerAssignment existingAssignment = cluster.getAssignmentById(td.getId());
if (existingAssignment != null) {
existingAssignment.getExecutorToSlot().forEach((exec, ws) -> {
String compId = execToComp.get(exec);
RasNode node = nodes.get(ws.getNodeId());
//populate node to component Assignments
nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).add(compId);
//populate worker to comp assignments
workerCompAssignment.computeIfAbsent(ws, (k) -> new HashSet<>()).add(compId);
});
}
//early detection/early fail
if (!checkSchedulingFeasibility(maxStateSearch)) {
//Scheduling Status set to FAIL_OTHER so no eviction policy will be attempted to make space for this topology
return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, "Scheduling not feasible!");
}
return backtrackSearch(new SearcherState(workerCompAssignment, nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td))
.asSchedulingResult();
}
private boolean checkSchedulingFeasibility(int maxStateSearch) {
for (String comp : spreadComps) {
int numExecs = compToExecs.get(comp).size();
if (numExecs > nodes.size()) {
LOG.error("Unsatisfiable constraint: Component: {} marked as spread has {} executors which is larger "
+ "than number of nodes: {}", comp, numExecs, nodes.size());
return false;
}
}
if (execToComp.size() >= maxStateSearch) {
LOG.error("Number of executors is greater than the maximum number of states allowed to be searched. "
+ "# of executors: {} Max states to search: {}", execToComp.size(), maxStateSearch);
return false;
}
return true;
}
@Override
protected TreeSet<ObjectResources> sortObjectResources(
final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
final ExistingScheduleFunc existingScheduleFunc) {
return GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc);
}
// Backtracking algorithm does not take into account the ordering of executors in worker to reduce traversal space
@VisibleForTesting
protected SolverResult backtrackSearch(SearcherState state) {
state.incStatesSearched();
if (state.areSearchLimitsExceeded()) {
LOG.warn("Limits Exceeded");
return new SolverResult(state, false);
}
if (Thread.currentThread().isInterrupted()) {
return new SolverResult(state, false);
}
ExecutorDetails exec = state.currentExec();
Iterable<String> sortedNodes = sortAllNodes(state.td, exec, favoredNodeIds, unFavoredNodeIds);
for (String nodeId: sortedNodes) {
RasNode node = nodes.get(nodeId);
for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
if (isExecAssignmentToWorkerValid(workerSlot, state)) {
state.tryToSchedule(execToComp, node, workerSlot);
if (state.areAllExecsScheduled()) {
//Everything is scheduled correctly, so no need to search any more.
return new SolverResult(state, true);
}
SolverResult results = backtrackSearch(state.nextExecutor());
if (results.success) {
//We found a good result we are done.
return results;
}
if (state.areSearchLimitsExceeded()) {
//No need to search more it is not going to help.
return new SolverResult(state, false);
}
//backtracking (If we ever get here there really isn't a lot of hope that we will find a scheduling)
state.backtrack(execToComp, node, workerSlot);
}
}
}
//Tried all of the slots and none of them worked.
return new SolverResult(state, false);
}
/**
* Check if any constraints are violated if exec is scheduled on worker.
* @return true if scheduling exec on worker does not violate any constraints, returns false if it does
*/
public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, SearcherState state) {
final ExecutorDetails exec = state.currentExec();
//check resources
RasNode node = nodes.get(worker.getNodeId());
if (!node.wouldFit(worker, exec, state.td)) {
LOG.trace("{} would not fit in resources available on {}", exec, worker);
return false;
}
//check if exec can be on worker based on user defined component exclusions
String execComp = execToComp.get(exec);
Set<String> components = state.workerCompAssignment.get(worker);
if (components != null) {
Map<String, Integer> subMatrix = constraintMatrix.get(execComp);
for (String comp : components) {
if (subMatrix.get(comp) != 0) {
LOG.trace("{} found {} constraint violation {} on {}", exec, execComp, comp, worker);
return false;
}
}
}
//check if exec satisfy spread
if (spreadComps.contains(execComp)) {
if (state.nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).contains(execComp)) {
LOG.trace("{} Found spread violation {} on node {}", exec, execComp, node.getId());
return false;
}
}
return true;
}
private Map<String, Set<ExecutorDetails>> getCompToExecs(Map<ExecutorDetails, String> executorToComp) {
Map<String, Set<ExecutorDetails>> retMap = new HashMap<>();
executorToComp.forEach((exec, comp) -> retMap.computeIfAbsent(comp, (k) -> new HashSet<>()).add(exec));
return retMap;
}
private ArrayList<ExecutorDetails> getSortedExecs(HashSet<String> spreadComps, Map<String, Map<String, Integer>> constraintMatrix,
Map<String, Set<ExecutorDetails>> compToExecs) {
ArrayList<ExecutorDetails> retList = new ArrayList<>();
//find number of constraints per component
//Key->Comp Value-># of constraints
Map<String, Integer> compConstraintCountMap = new HashMap<>();
constraintMatrix.forEach((comp, subMatrix) -> {
int count = subMatrix.values().stream().mapToInt(Number::intValue).sum();
//check component is declared for spreading
if (spreadComps.contains(comp)) {
count++;
}
compConstraintCountMap.put(comp, count);
});
//Sort comps by number of constraints
NavigableMap<String, Integer> sortedCompConstraintCountMap = sortByValues(compConstraintCountMap);
//sort executors based on component constraints
for (String comp : sortedCompConstraintCountMap.keySet()) {
retList.addAll(compToExecs.get(comp));
}
return retList;
}
/**
* Used to sort a Map by the values.
*/
@VisibleForTesting
public <K extends Comparable<K>, V extends Comparable<V>> NavigableMap<K, V> sortByValues(final Map<K, V> map) {
Comparator<K> valueComparator = (k1, k2) -> {
int compare = map.get(k2).compareTo(map.get(k1));
if (compare == 0) {
return k2.compareTo(k1);
} else {
return compare;
}
};
NavigableMap<K, V> sortedByValues = new TreeMap<>(valueComparator);
sortedByValues.putAll(map);
return sortedByValues;
}
protected static class SolverResult {
private final int statesSearched;
private final boolean success;
private final long timeTakenMillis;
private final int backtracked;
public SolverResult(SearcherState state, boolean success) {
this.statesSearched = state.getStatesSearched();
this.success = success;
timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis;
backtracked = state.numBacktrack;
}
public SchedulingResult asSchedulingResult() {
if (success) {
return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched
+ " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
}
return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
"Cannot find scheduling that satisfies all constraints (" + statesSearched
+ " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)");
}
}
protected static class SearcherState {
final long startTimeMillis;
private final long maxEndTimeMs;
// A map of the worker to the components in the worker to be able to enforce constraints.
private final Map<WorkerSlot, Set<String>> workerCompAssignment;
private final boolean[] okToRemoveFromWorker;
// for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints
private final Map<RasNode, Set<String>> nodeCompAssignment;
private final boolean[] okToRemoveFromNode;
// Static State
// The list of all executors (preferably sorted to make assignments simpler).
private final List<ExecutorDetails> execs;
//The maximum number of state to search before stopping.
private final int maxStatesSearched;
//The topology we are scheduling
private final TopologyDetails td;
// Metrics
// How many states searched so far.
private int statesSearched = 0;
// Number of times we had to backtrack.
private int numBacktrack = 0;
// Current state
// The current executor we are trying to schedule
private int execIndex = 0;
private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RasNode, Set<String>> nodeCompAssignment,
int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) {
assert !execs.isEmpty();
assert execs != null;
this.workerCompAssignment = workerCompAssignment;
this.nodeCompAssignment = nodeCompAssignment;
this.maxStatesSearched = maxStatesSearched;
this.execs = execs;
okToRemoveFromWorker = new boolean[execs.size()];
okToRemoveFromNode = new boolean[execs.size()];
this.td = td;
startTimeMillis = Time.currentTimeMillis();
if (maxTimeMs <= 0) {
maxEndTimeMs = Long.MAX_VALUE;
} else {
maxEndTimeMs = startTimeMillis + maxTimeMs;
}
}
public void incStatesSearched() {
statesSearched++;
if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
LOG.debug("States Searched: {}", statesSearched);
LOG.debug("backtrack: {}", numBacktrack);
}
}
public int getStatesSearched() {
return statesSearched;
}
public boolean areSearchLimitsExceeded() {
return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
}
public SearcherState nextExecutor() {
execIndex++;
if (execIndex >= execs.size()) {
throw new IllegalStateException("Internal Error: exceeded the exec limit " + execIndex + " >= " + execs.size());
}
return this;
}
public boolean areAllExecsScheduled() {
return execIndex == execs.size() - 1;
}
public ExecutorDetails currentExec() {
return execs.get(execIndex);
}
public void tryToSchedule(Map<ExecutorDetails, String> execToComp, RasNode node, WorkerSlot workerSlot) {
ExecutorDetails exec = currentExec();
String comp = execToComp.get(exec);
LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot);
//It is possible that this component is already scheduled on this node or worker. If so when we backtrack we cannot remove it
okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp);
okToRemoveFromNode[execIndex] = nodeCompAssignment.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp);
node.assignSingleExecutor(workerSlot, exec, td);
}
public void backtrack(Map<ExecutorDetails, String> execToComp, RasNode node, WorkerSlot workerSlot) {
execIndex--;
if (execIndex < 0) {
throw new IllegalStateException("Internal Error: exec index became negative");
}
numBacktrack++;
ExecutorDetails exec = currentExec();
String comp = execToComp.get(exec);
LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot);
if (okToRemoveFromWorker[execIndex]) {
workerCompAssignment.get(workerSlot).remove(comp);
okToRemoveFromWorker[execIndex] = false;
}
if (okToRemoveFromNode[execIndex]) {
nodeCompAssignment.get(node).remove(comp);
okToRemoveFromNode[execIndex] = false;
}
node.freeSingleExecutor(exec, td);
}
}
}