blob: eda375339bc59f4cabbf23e2807cfe78ac373b53 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.scheduler.resource.strategies.scheduling.sorter;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import org.apache.storm.scheduler.Component;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.shade.com.google.common.collect.Sets;
public class ExecSorterByConnectionCount implements IExecSorter {
protected TopologyDetails topologyDetails;
public ExecSorterByConnectionCount(TopologyDetails topologyDetails) {
this.topologyDetails = topologyDetails;
}
/**
* Order executors based on how many in and out connections it will potentially need to make, in descending order. First order
* components by the number of in and out connections it will have. Then iterate through the sorted list of components. For each
* component sort the neighbors of that component by how many connections it will have to make with that component.
* Add an executor from this component and then from each neighboring component in sorted order. Do this until there is
* nothing left to schedule. Then add back executors not accounted for - which are system executors.
*
* @param unassignedExecutors an unmodifiable set of executors that need to be scheduled.
* @return a list of executors in sorted order for scheduling.
*/
public List<ExecutorDetails> sortExecutors(Set<ExecutorDetails> unassignedExecutors) {
Map<String, Component> componentMap = topologyDetails.getComponents(); // excludes system components
LinkedHashSet<ExecutorDetails> orderedExecutorSet = new LinkedHashSet<>(); // in insert order
Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
for (Component component : componentMap.values()) {
compToExecsToSchedule.put(component.getId(), new LinkedList<>());
for (ExecutorDetails exec : component.getExecs()) {
if (unassignedExecutors.contains(exec)) {
compToExecsToSchedule.get(component.getId()).add(exec);
}
}
}
Set<Component> sortedComponents = sortComponents(componentMap);
sortedComponents.addAll(componentMap.values());
for (Component currComp : sortedComponents) {
Map<String, Component> neighbors = new HashMap<>();
for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) {
neighbors.put(compId, componentMap.get(compId));
}
Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors);
Queue<ExecutorDetails> currCompExecsToSched = compToExecsToSchedule.get(currComp.getId());
boolean flag;
do {
flag = false;
if (!currCompExecsToSched.isEmpty()) {
orderedExecutorSet.add(currCompExecsToSched.poll());
flag = true;
}
for (Component neighborComp : sortedNeighbors) {
Queue<ExecutorDetails> neighborCompExesToSched = compToExecsToSchedule.get(neighborComp.getId());
if (!neighborCompExesToSched.isEmpty()) {
orderedExecutorSet.add(neighborCompExesToSched.poll());
flag = true;
}
}
} while (flag);
}
// add executors not in sorted list - which may be system executors
orderedExecutorSet.addAll(unassignedExecutors);
return new LinkedList<>(orderedExecutorSet);
}
/**
* sort components by the number of in and out connections that need to be made, in descending order.
*
* @param componentMap The components that need to be sorted
* @return a sorted set of components
*/
private Set<Component> sortComponents(final Map<String, Component> componentMap) {
Set<Component> sortedComponents =
new TreeSet<>((o1, o2) -> {
int connections1 = 0;
int connections2 = 0;
for (String childId : Sets.union(o1.getChildren(), o1.getParents())) {
connections1 +=
(componentMap.get(childId).getExecs().size() * o1.getExecs().size());
}
for (String childId : Sets.union(o2.getChildren(), o2.getParents())) {
connections2 +=
(componentMap.get(childId).getExecs().size() * o2.getExecs().size());
}
if (connections1 > connections2) {
return -1;
} else if (connections1 < connections2) {
return 1;
} else {
return o1.getId().compareTo(o2.getId());
}
});
sortedComponents.addAll(componentMap.values());
return sortedComponents;
}
/**
* Sort a component's neighbors by the number of connections it needs to make with this component.
*
* @param thisComp the component that we need to sort its neighbors
* @param componentMap all the components to sort
* @return a sorted set of components
*/
private Set<Component> sortNeighbors(
final Component thisComp, final Map<String, Component> componentMap) {
Set<Component> sortedComponents =
new TreeSet<>((o1, o2) -> {
int connections1 = o1.getExecs().size() * thisComp.getExecs().size();
int connections2 = o2.getExecs().size() * thisComp.getExecs().size();
if (connections1 < connections2) {
return -1;
} else if (connections1 > connections2) {
return 1;
} else {
return o1.getId().compareTo(o2.getId());
}
});
sortedComponents.addAll(componentMap.values());
return sortedComponents;
}
}