Merge pull request #3277 from agresch/agresch_storm_3641
STORM-3641 upgrade metrics API for JCQueue
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 3316f42..c27b468 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -313,6 +313,13 @@
public static final String TOPOLOGY_SCHEDULER_STRATEGY = "topology.scheduler.strategy";
/**
+ * When DefaultResourceAwareStrategy or GenericResourceAwareStrategy is used,
+ * scheduler will sort unassigned executors based on a particular order.
+ * If this config is set to true, unassigned executors will be sorted by topological order with network proximity needs.
+ */
+ public static final String TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS = "topology.ras.order.executors.by.proximity.needs";
+
+ /**
* Declare scheduling constraints for a topology used by the constraint solver strategy. The format can be either
* old style (validated by ListOfListOfStringValidator.class or the newer style, which is a list of specific type of
* Maps (validated by RasConstraintsTypeValidator.class). The value must be in one or the other format.
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index f7fdda7..f907a23 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -12,6 +12,7 @@
package org.apache.storm.security.auth.kerberos;
+import com.codahale.metrics.Gauge;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.security.Principal;
@@ -28,7 +29,6 @@
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import javax.xml.bind.DatatypeConverter;
-import org.apache.storm.Config;
import org.apache.storm.metric.api.IMetricsRegistrant;
import org.apache.storm.security.auth.ClientAuthUtils;
import org.apache.storm.security.auth.IAutoCredentials;
@@ -114,6 +114,7 @@
LOG.info("Got a Subject " + s);
}
+ @Override
public void prepare(Map<String, Object> conf) {
this.conf = conf;
}
@@ -280,7 +281,11 @@
@Override
public void registerMetrics(TopologyContext topoContext, Map<String, Object> topoConf) {
- int bucketSize = ((Number) topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)).intValue();
- topoContext.registerMetric("TGT-TimeToExpiryMsecs", () -> getMsecsUntilExpiration(), bucketSize);
+ topoContext.registerGauge("TGT-TimeToExpiryMsecs", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return getMsecsUntilExpiration();
+ }
+ });
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Component.java b/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
index c4a28c3..7f5953a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/Component.java
@@ -18,10 +18,14 @@
package org.apache.storm.scheduler;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.storm.generated.ComponentType;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
public class Component {
private final String id;
@@ -29,6 +33,7 @@
private final ComponentType type;
private final Set<String> parents = new HashSet<>();
private final Set<String> children = new HashSet<>();
+ private final Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
/**
* Create a new component.
@@ -37,10 +42,11 @@
* @param compId the id of the component
* @param execs the executors for this component.
*/
- public Component(ComponentType type, String compId, List<ExecutorDetails> execs) {
+ public Component(ComponentType type, String compId, List<ExecutorDetails> execs, Map<GlobalStreamId, Grouping> inputs) {
this.type = type;
this.id = compId;
this.execs = execs;
+ this.inputs.putAll(inputs);
}
/**
@@ -73,16 +79,19 @@
return children;
}
+ public Map<GlobalStreamId, Grouping> getInputs() {
+ return inputs;
+ }
+
@Override
public String toString() {
- return "{id: "
- + getId()
- + " Parents: "
- + getParents()
- + " Children: "
- + getChildren()
- + " Execs: "
- + getExecs()
- + "}";
+ return "Component{"
+ + "id='" + id + '\''
+ + ", execs=" + execs
+ + ", type=" + type
+ + ", parents=" + parents
+ + ", children=" + children
+ + ", inputs=" + inputs
+ + '}';
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
index be20834..551a94b 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/TopologyDetails.java
@@ -207,8 +207,9 @@
if (spouts != null) {
for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
String compId = entry.getKey();
+ SpoutSpec spout = entry.getValue();
if (!Utils.isSystemId(compId)) {
- Component comp = new Component(ComponentType.SPOUT, compId, componentToExecs(compId));
+ Component comp = new Component(ComponentType.SPOUT, compId, componentToExecs(compId), spout.get_common().get_inputs());
ret.put(compId, comp);
}
}
@@ -216,8 +217,9 @@
if (bolts != null) {
for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
String compId = entry.getKey();
+ Bolt bolt = entry.getValue();
if (!Utils.isSystemId(compId)) {
- Component comp = new Component(ComponentType.BOLT, compId, componentToExecs(compId));
+ Component comp = new Component(ComponentType.BOLT, compId, componentToExecs(compId), bolt.get_common().get_inputs());
ret.put(compId, comp);
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index d6a1bc3..87345a0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -35,7 +35,10 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.storm.Config;
import org.apache.storm.generated.ComponentType;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
import org.apache.storm.networktopography.DNSToSwitchMapping;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.Component;
@@ -51,6 +54,7 @@
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.collect.Sets;
+import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -511,6 +515,18 @@
return sortedComponents;
}
+ protected List<ExecutorDetails> orderExecutors(
+ TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) {
+ Boolean orderByProximity = ObjectReader.getBoolean(
+ td.getConf().get(Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS), false);
+ if (!orderByProximity) {
+ return orderExecutorsDefault(td, unassignedExecutors);
+ } else {
+ LOG.info("{} is set to true", Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS);
+ return orderExecutorsByProximityNeeds(td, unassignedExecutors);
+ }
+ }
+
/**
* Order executors based on how many in and out connections it will potentially need to make, in descending order. First order
* components by the number of in and out connections it will have. Then iterate through the sorted list of components. For each
@@ -522,7 +538,7 @@
* this list
* @return a list of executors in sorted order
*/
- protected List<ExecutorDetails> orderExecutors(
+ private List<ExecutorDetails> orderExecutorsDefault(
TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) {
Map<String, Component> componentMap = td.getComponents();
List<ExecutorDetails> execsScheduled = new LinkedList<>();
@@ -570,6 +586,157 @@
}
/**
+ * Order executors by network proximity needs.
+ * @param td The topology the executors belong to
+ * @param unassignedExecutors a collection of unassigned executors that need to be unassigned. Should only try to
+ * assign executors from this list
+ * @return a list of executors in sorted order
+ */
+ private List<ExecutorDetails> orderExecutorsByProximityNeeds(
+ TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) {
+ Map<String, Component> componentMap = td.getComponents();
+ List<ExecutorDetails> execsScheduled = new LinkedList<>();
+
+ Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
+ for (Component component : componentMap.values()) {
+ compToExecsToSchedule.put(component.getId(), new LinkedList<>());
+ for (ExecutorDetails exec : component.getExecs()) {
+ if (unassignedExecutors.contains(exec)) {
+ compToExecsToSchedule.get(component.getId()).add(exec);
+ }
+ }
+ }
+
+ List<Component> sortedComponents = topologicalSortComponents(componentMap);
+
+ for (Component currComp: sortedComponents) {
+ int numExecs = compToExecsToSchedule.get(currComp.getId()).size();
+ for (int i = 0; i < numExecs; i++) {
+ execsScheduled.addAll(takeExecutors(currComp, componentMap, compToExecsToSchedule));
+ }
+ }
+
+ return execsScheduled;
+ }
+
+ /**
+ * Sort components topologically.
+ * @param componentMap The map of component Id to Component Object.
+ * @return The sorted components
+ */
+ private List<Component> topologicalSortComponents(final Map<String, Component> componentMap) {
+ List<Component> sortedComponents = new ArrayList<>();
+ boolean[] visited = new boolean[componentMap.size()];
+ int[] inDegree = new int[componentMap.size()];
+ List<String> componentIds = new ArrayList<>(componentMap.keySet());
+ Map<String, Integer> compIdToIndex = new HashMap<>();
+ for (int i = 0; i < componentIds.size(); i++) {
+ compIdToIndex.put(componentIds.get(i), i);
+ }
+ //initialize the in-degree array
+ for (int i = 0; i < inDegree.length; i++) {
+ String compId = componentIds.get(i);
+ Component comp = componentMap.get(compId);
+ for (String childId : comp.getChildren()) {
+ inDegree[compIdToIndex.get(childId)] += 1;
+ }
+ }
+ //sorting components topologically
+ for (int t = 0; t < inDegree.length; t++) {
+ for (int i = 0; i < inDegree.length; i++) {
+ if (inDegree[i] == 0 && !visited[i]) {
+ String compId = componentIds.get(i);
+ Component comp = componentMap.get(compId);
+ sortedComponents.add(comp);
+ visited[i] = true;
+ for (String childId : comp.getChildren()) {
+ inDegree[compIdToIndex.get(childId)]--;
+ }
+ break;
+ }
+ }
+ }
+ return sortedComponents;
+ }
+
+ /**
+ * Take unscheduled executors from current and all its downstream components in a particular order.
+ * First, take one executor from the current component;
+ * then for every child (direct downstream component) of this component,
+ * if it's shuffle grouping from the current component to this child,
+ * the number of executors to take from this child is the max of
+ * 1 and (the number of unscheduled executors this child has / the number of unscheduled executors the current component has);
+ * otherwise, the number of executors to take is 1;
+ * for every executor to take from this child, call takeExecutors(...).
+ * @param currComp The current component.
+ * @param componentMap The map from component Id to component object.
+ * @param compToExecsToSchedule The map from component Id to unscheduled executors.
+ * @return The executors to schedule in order.
+ */
+ private List<ExecutorDetails> takeExecutors(Component currComp,
+ final Map<String, Component> componentMap,
+ final Map<String, Queue<ExecutorDetails>> compToExecsToSchedule) {
+ List<ExecutorDetails> execsScheduled = new ArrayList<>();
+ Queue<ExecutorDetails> currQueue = compToExecsToSchedule.get(currComp.getId());
+ int currUnscheduledNumExecs = currQueue.size();
+ //Just for defensive programming as this won't actually happen.
+ if (currUnscheduledNumExecs == 0) {
+ return execsScheduled;
+ }
+ execsScheduled.add(currQueue.poll());
+ Set<String> sortedChildren = getSortedChildren(currComp, componentMap);
+ for (String childId: sortedChildren) {
+ Component childComponent = componentMap.get(childId);
+ Queue<ExecutorDetails> childQueue = compToExecsToSchedule.get(childId);
+ int childUnscheduledNumExecs = childQueue.size();
+ if (childUnscheduledNumExecs == 0) {
+ continue;
+ }
+ int numExecsToTake = 1;
+ if (hasShuffleGroupingFromParentToChild(currComp, childComponent)) {
+ // if it's shuffle grouping, truncate
+ numExecsToTake = Math.max(1, childUnscheduledNumExecs / currUnscheduledNumExecs);
+ } // otherwise, one-by-one
+ for (int i = 0; i < numExecsToTake; i++) {
+ execsScheduled.addAll(takeExecutors(childComponent, componentMap, compToExecsToSchedule));
+ }
+ }
+ return execsScheduled;
+ }
+
+ private Set<String> getSortedChildren(Component component, final Map<String, Component> componentMap) {
+ Set<String> children = component.getChildren();
+ Set<String> sortedChildren =
+ new TreeSet<>((o1, o2) -> {
+ Component child1 = componentMap.get(o1);
+ Component child2 = componentMap.get(o2);
+ boolean child1IsShuffle = hasShuffleGroupingFromParentToChild(component, child1);
+ boolean child2IsShuffle = hasShuffleGroupingFromParentToChild(component, child2);
+ if (child1IsShuffle && child2IsShuffle) {
+ return o1.compareTo(o2);
+ } else if (child1IsShuffle) {
+ return 1;
+ } else {
+ return -1;
+ }
+ });
+ sortedChildren.addAll(children);
+ return sortedChildren;
+ }
+
+ private boolean hasShuffleGroupingFromParentToChild(Component parent, Component child) {
+ for (Map.Entry<GlobalStreamId, Grouping> inputEntry: child.getInputs().entrySet()) {
+ GlobalStreamId globalStreamId = inputEntry.getKey();
+ Grouping grouping = inputEntry.getValue();
+ if (globalStreamId.get_componentId().equals(parent.getId())
+ && (inputEntry.getValue().is_set_local_or_shuffle() || grouping.is_set_shuffle())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Get a list of all the spouts in the topology.
*
* @param td topology to get spouts from
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
index a2f2437..3fd0d73 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestDefaultResourceAwareStrategy.java
@@ -475,6 +475,65 @@
}
/**
+ * test if the scheduling logic for the DefaultResourceAwareStrategy (when made by network proximity needs.) is correct
+ */
+ @Test
+ public void testDefaultResourceAwareStrategyInFavorOfShuffle() {
+ int spoutParallelism = 1;
+ int boltParallelism = 2;
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("spout", new TestSpout(),
+ spoutParallelism);
+ builder.setBolt("bolt-1", new TestBolt(),
+ boltParallelism).shuffleGrouping("spout");
+ builder.setBolt("bolt-2", new TestBolt(),
+ boltParallelism).shuffleGrouping("bolt-1");
+ builder.setBolt("bolt-3", new TestBolt(),
+ boltParallelism).shuffleGrouping("bolt-2");
+
+ StormTopology stormToplogy = builder.createTopology();
+
+ INimbus iNimbus = new INimbusTest();
+ Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 150, 1500);
+ Config conf = createClusterConfig(50, 250, 250, null);
+ conf.put(Config.TOPOLOGY_PRIORITY, 0);
+ conf.put(Config.TOPOLOGY_NAME, "testTopology");
+ conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+ conf.put(Config.TOPOLOGY_SUBMITTER_USER, "user");
+ conf.put(Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS, true);
+
+ TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
+ genExecsAndComps(stormToplogy), CURRENT_TIME, "user");
+
+ Topologies topologies = new Topologies(topo);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(conf, new StormMetricsRegistry());
+ rs.schedule(topologies, cluster);
+ //:<[[[0, 0], [6, 6], [2, 2]], [[3, 3]], [[5, 5], [4, 4], [1, 1]]]>
+
+ HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
+ expectedScheduling.add(new HashSet<>(Arrays.asList(
+ new ExecutorDetails(0, 0), //spout
+ new ExecutorDetails(6, 6), //bolt-2
+ new ExecutorDetails(2, 2)))); //bolt-1
+ expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(3, 3)))); //bolt-3
+ expectedScheduling.add(new HashSet<>(Arrays.asList(
+ new ExecutorDetails(5, 5), //bolt-2
+ new ExecutorDetails(4, 4), //bolt-3
+ new ExecutorDetails(1, 1)))); //bolt-1
+ HashSet<HashSet<ExecutorDetails>> foundScheduling = new HashSet<>();
+ SchedulerAssignment assignment = cluster.getAssignmentById("testTopology-id");
+ for (Collection<ExecutorDetails> execs : assignment.getSlotToExecutors().values()) {
+ foundScheduling.add(new HashSet<>(execs));
+ }
+
+ Assert.assertEquals(expectedScheduling, foundScheduling);
+ }
+
+ /**
* Test whether strategy will choose correct rack
*/
@Test
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
index 51252ad..79796e6 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestGenericResourceAwareStrategy.java
@@ -301,6 +301,69 @@
assertTopologiesFullyScheduled(cluster, gpu2);
}
+ /**
+ * test if the scheduling logic for the GenericResourceAwareStrategy (when in favor of shuffle) is correct.
+ */
+ @Test
+ public void testGenericResourceAwareStrategyInFavorOfShuffle() {
+ int spoutParallelism = 1;
+ int boltParallelism = 2;
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("spout", new TestSpout(),
+ spoutParallelism);
+ builder.setBolt("bolt-1", new TestBolt(),
+ boltParallelism).shuffleGrouping("spout");
+ builder.setBolt("bolt-2", new TestBolt(),
+ boltParallelism).shuffleGrouping("bolt-1").addResource("gpu.count", 1.0);
+ builder.setBolt("bolt-3", new TestBolt(),
+ boltParallelism).shuffleGrouping("bolt-2").addResource("gpu.count", 2.0);
+
+ StormTopology stormToplogy = builder.createTopology();
+
+ INimbus iNimbus = new INimbusTest();
+
+ Config conf = createGrasClusterConfig(50, 250, 250, null, Collections.emptyMap());
+ Map<String, Double> genericResourcesMap = new HashMap<>();
+ genericResourcesMap.put("gpu.count", 2.0);
+ Map<String, SupervisorDetails> supMap = genSupervisors(4, 4, 150, 1500, genericResourcesMap);
+
+
+ conf.put(Config.TOPOLOGY_PRIORITY, 0);
+ conf.put(Config.TOPOLOGY_NAME, "testTopology");
+ conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, Double.MAX_VALUE);
+ conf.put(Config.TOPOLOGY_SUBMITTER_USER, "user");
+ conf.put(Config.TOPOLOGY_RAS_ORDER_EXECUTORS_BY_PROXIMITY_NEEDS, true);
+
+ TopologyDetails topo = new TopologyDetails("testTopology-id", conf, stormToplogy, 0,
+ genExecsAndComps(stormToplogy), currentTime, "user");
+
+ Topologies topologies = new Topologies(topo);
+ Cluster cluster = new Cluster(iNimbus, new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, conf);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(conf, new StormMetricsRegistry());
+ rs.schedule(topologies, cluster);
+
+ HashSet<HashSet<ExecutorDetails>> expectedScheduling = new HashSet<>();
+ expectedScheduling.add(new HashSet<>(Arrays.asList(
+ new ExecutorDetails(0, 0),
+ new ExecutorDetails(2, 2),
+ new ExecutorDetails(6, 6))));
+ expectedScheduling.add(new HashSet<>(Arrays.asList(
+ new ExecutorDetails(4, 4),
+ new ExecutorDetails(1, 1))));
+ expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(5, 5))));
+ expectedScheduling.add(new HashSet<>(Arrays.asList(new ExecutorDetails(3, 3))));
+ HashSet<HashSet<ExecutorDetails>> foundScheduling = new HashSet<>();
+ SchedulerAssignment assignment = cluster.getAssignmentById("testTopology-id");
+ for (Collection<ExecutorDetails> execs : assignment.getSlotToExecutors().values()) {
+ foundScheduling.add(new HashSet<>(execs));
+ }
+
+ assertEquals(expectedScheduling, foundScheduling);
+ }
+
@Test
public void testAntiAffinityWithMultipleTopologies() {
INimbus iNimbus = new INimbusTest();