blob: cb3f989875343bf2b214a4d0d703a59d2ffb8914 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.scheduler.Component;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExecSorterByProximity implements IExecSorter {
private static final Logger LOG = LoggerFactory.getLogger(ExecSorterByProximity.class);
protected TopologyDetails topologyDetails;
public ExecSorterByProximity(TopologyDetails topologyDetails) {
this.topologyDetails = topologyDetails;
}
/**
* Order executors by network proximity needs. First add all executors for components that
* are in topological sorted order. Then add back executors not accounted for - which are
* system executors.
*
* @param unassignedExecutors an unmodifiable set of executors that need to be scheduled.
* @return a list of executors in sorted order for scheduling.
*/
public List<ExecutorDetails> sortExecutors(Set<ExecutorDetails> unassignedExecutors) {
Map<String, Component> componentMap = topologyDetails.getComponents(); // excludes system components
LinkedHashSet<ExecutorDetails> orderedExecutorSet = new LinkedHashSet<>(); // in insert order
Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
for (Component component : componentMap.values()) {
compToExecsToSchedule.put(component.getId(), new LinkedList<>());
for (ExecutorDetails exec : component.getExecs()) {
if (unassignedExecutors.contains(exec)) {
compToExecsToSchedule.get(component.getId()).add(exec);
}
}
}
List<Component> sortedComponents = topologicalSortComponents(componentMap);
for (Component currComp: sortedComponents) {
int numExecs = compToExecsToSchedule.get(currComp.getId()).size();
for (int i = 0; i < numExecs; i++) {
orderedExecutorSet.addAll(takeExecutors(currComp, componentMap, compToExecsToSchedule));
}
}
// add executors not in sorted list - which may be system executors
orderedExecutorSet.addAll(unassignedExecutors);
return new LinkedList<>(orderedExecutorSet);
}
/**
* Sort components topologically.
* @param componentMap The map of component Id to Component Object.
* @return The sorted components
*/
private List<Component> topologicalSortComponents(final Map<String, Component> componentMap) {
LinkedHashSet<Component> sortedComponentsSet = new LinkedHashSet<>();
boolean[] visited = new boolean[componentMap.size()];
int[] inDegree = new int[componentMap.size()];
List<String> componentIds = new ArrayList<>(componentMap.keySet());
Map<String, Integer> compIdToIndex = new HashMap<>();
for (int i = 0; i < componentIds.size(); i++) {
compIdToIndex.put(componentIds.get(i), i);
}
//initialize the in-degree array
for (int i = 0; i < inDegree.length; i++) {
String compId = componentIds.get(i);
Component comp = componentMap.get(compId);
for (String childId : comp.getChildren()) {
inDegree[compIdToIndex.get(childId)] += 1;
}
}
//sorting components topologically
for (int t = 0; t < inDegree.length; t++) {
for (int i = 0; i < inDegree.length; i++) {
if (inDegree[i] == 0 && !visited[i]) {
String compId = componentIds.get(i);
Component comp = componentMap.get(compId);
sortedComponentsSet.add(comp);
visited[i] = true;
for (String childId : comp.getChildren()) {
inDegree[compIdToIndex.get(childId)]--;
}
break;
}
}
}
// add back components that could not be visited and issue warning about loop in component data flow
if (sortedComponentsSet.size() != componentMap.size()) {
String unvisitedComponentIds = componentMap.entrySet().stream()
.filter(x -> !sortedComponentsSet.contains(x.getValue()))
.map(x -> x.getKey())
.collect(Collectors.joining(","));
LOG.warn("topologicalSortComponents for topology {} detected possible loop(s) involving components {}, "
+ "appending them to the end of the sorted component list",
topologyDetails.getId(), unvisitedComponentIds);
sortedComponentsSet.addAll(componentMap.values());
}
return new ArrayList<>(sortedComponentsSet);
}
/**
* Take unscheduled executors from current and all its downstream components in a particular order.
* First, take one executor from the current component;
* then for every child (direct downstream component) of this component,
* if it's shuffle grouping from the current component to this child,
* the number of executors to take from this child is the max of
* 1 and (the number of unscheduled executors this child has / the number of unscheduled executors the current component has);
* otherwise, the number of executors to take is 1;
* for every executor to take from this child, call takeExecutors(...).
* @param currComp The current component.
* @param componentMap The map from component Id to component object.
* @param compToExecsToSchedule The map from component Id to unscheduled executors.
* @return The executors to schedule in order.
*/
private List<ExecutorDetails> takeExecutors(Component currComp,
final Map<String, Component> componentMap,
final Map<String, Queue<ExecutorDetails>> compToExecsToSchedule) {
List<ExecutorDetails> execsScheduled = new ArrayList<>();
Queue<ExecutorDetails> currQueue = compToExecsToSchedule.get(currComp.getId());
int currUnscheduledNumExecs = currQueue.size();
//Just for defensive programming as this won't actually happen.
if (currUnscheduledNumExecs == 0) {
return execsScheduled;
}
execsScheduled.add(currQueue.poll());
Set<String> sortedChildren = getSortedChildren(currComp, componentMap);
for (String childId: sortedChildren) {
Component childComponent = componentMap.get(childId);
Queue<ExecutorDetails> childQueue = compToExecsToSchedule.get(childId);
int childUnscheduledNumExecs = childQueue.size();
if (childUnscheduledNumExecs == 0) {
continue;
}
int numExecsToTake = 1;
if (hasShuffleGroupingFromParentToChild(currComp, childComponent)) {
// if it's shuffle grouping, truncate
numExecsToTake = Math.max(1, childUnscheduledNumExecs / currUnscheduledNumExecs);
} // otherwise, one-by-one
for (int i = 0; i < numExecsToTake; i++) {
execsScheduled.addAll(takeExecutors(childComponent, componentMap, compToExecsToSchedule));
}
}
return execsScheduled;
}
private Set<String> getSortedChildren(Component component, final Map<String, Component> componentMap) {
Set<String> children = component.getChildren();
Set<String> sortedChildren =
new TreeSet<>((o1, o2) -> {
Component child1 = componentMap.get(o1);
Component child2 = componentMap.get(o2);
boolean child1IsShuffle = hasShuffleGroupingFromParentToChild(component, child1);
boolean child2IsShuffle = hasShuffleGroupingFromParentToChild(component, child2);
if (child1IsShuffle && child2IsShuffle) {
return o1.compareTo(o2);
} else if (child1IsShuffle) {
return 1;
} else {
return -1;
}
});
sortedChildren.addAll(children);
return sortedChildren;
}
private boolean hasShuffleGroupingFromParentToChild(Component parent, Component child) {
for (Map.Entry<GlobalStreamId, Grouping> inputEntry: child.getInputs().entrySet()) {
GlobalStreamId globalStreamId = inputEntry.getKey();
Grouping grouping = inputEntry.getValue();
if (globalStreamId.get_componentId().equals(parent.getId())
&& (inputEntry.getValue().is_set_local_or_shuffle() || grouping.is_set_shuffle())) {
return true;
}
}
return false;
}
}