Merge pull request #3098 from Ethanlm/STORM-3481
[STORM-3481] fix IllegalArgumentException for ConstraintSolverStrategy
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 2866b03..a2aa58a 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -316,13 +316,6 @@
@IsPositiveNumber
public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH = "topology.ras.constraint.max.state.search";
/**
- * The maximum number of states that will be searched looking for a solution in the constraint solver strategy.
- * Backward compatibility config value for old topologies
- */
- @IsInteger
- @IsPositiveNumber
- public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_TRAVERSAL = "topology.ras.constraint.max.state.traversal";
- /**
* The maximum number of seconds to spend scheduling a topology using the constraint solver. Null means no limit.
*/
@IsInteger
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
index 0a6b279..9f8265d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
@@ -248,24 +248,11 @@
Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
- //set max number of states to search maintaining backward compatibility for old topologies
- String stormVersionString = td.getTopology().get_storm_version();
- boolean is2xTopology = stormVersionString != null && stormVersionString.startsWith("2");
+ int confMaxStateSearch = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH));
+ int daemonMaxStateSearch = ObjectReader.getInt(cluster.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH));
+ final int maxStateSearch = Math.min(daemonMaxStateSearch, confMaxStateSearch);
- Object confMaxStateSearch = null;
- if (is2xTopology == false) {
- //backward compatibility
- confMaxStateSearch = td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_TRAVERSAL);
- }
- if (confMaxStateSearch == null) {
- //new topology or old topology using new config
- confMaxStateSearch = td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH);
- }
- int daemonMaxStateSearch = ObjectReader.getInt(td.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH));
- final int maxStateSearch = Math.min(daemonMaxStateSearch, ObjectReader.getInt(confMaxStateSearch));
-
- final long maxTimeMs =
- ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1).intValue() * 1000L;
+ final long maxTimeMs = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1) * 1000L;
favoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
unFavoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index 2bad23a..4ae5c88 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -87,10 +87,16 @@
return genTopology("testTopo", config, 1, 4, 4, boltParallel, 0, 0, "user");
}
- public Cluster makeCluster(TopologyDetails topo) {
- Topologies topologies = new Topologies(topo);
- Map<String, SupervisorDetails> supMap = genSupervisors(4, 2, 120, 1200);
- return new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, new Config());
+ public Cluster makeCluster(Topologies topologies) {
+ return makeCluster(topologies, null);
+ }
+
+ public Cluster makeCluster(Topologies topologies, Map<String, SupervisorDetails> supMap) {
+ if (supMap == null) {
+ supMap = genSupervisors(4, 2, 120, 1200);
+ }
+ Map<String, Object> config = Utils.readDefaultConfig();
+ return new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
}
public void basicUnitTestWithKillAndRecover(ConstraintSolverStrategy cs, int boltParallel) {
@@ -98,7 +104,8 @@
cs.prepare(config);
TopologyDetails topo = makeTopology(config, boltParallel);
- Cluster cluster = makeCluster(topo);
+ Topologies topologies = new Topologies(topo);
+ Cluster cluster = makeCluster(topologies);
LOG.info("Scheduling...");
SchedulingResult result = cs.schedule(cluster, topo);
@@ -164,7 +171,8 @@
cs.prepare(config);
TopologyDetails topo = makeTopology(config, NORMAL_BOLT_PARALLEL);
- Cluster cluster = makeCluster(topo);
+ Topologies topologies = new Topologies(topo);
+ Cluster cluster = makeCluster(topologies);
LOG.info("Scheduling...");
SchedulingResult result = cs.schedule(cluster, topo);
@@ -196,8 +204,6 @@
@Test
public void testIntegrationWithRAS() {
- Map<String, SupervisorDetails> supMap = genSupervisors(30, 16, 400, 1024 * 4);
-
List<List<String>> constraints = new LinkedList<>();
addContraints("spout-0", "bolt-0", constraints);
addContraints("bolt-1", "bolt-1", constraints);
@@ -206,8 +212,6 @@
spread.add("spout-0");
Map<String, Object> config = Utils.readDefaultConfig();
- config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName());
- config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, ConstraintSolverStrategy.class.getName());
config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread);
config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
@@ -222,7 +226,8 @@
Map<String, TopologyDetails> topoMap = new HashMap<>();
topoMap.put(topo.getId(), topo);
Topologies topologies = new Topologies(topoMap);
- Cluster cluster = new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+ Map<String, SupervisorDetails> supMap = genSupervisors(30, 16, 400, 1024 * 4);
+ Cluster cluster = makeCluster(topologies, supMap);
ResourceAwareScheduler rs = new ResourceAwareScheduler();
rs.prepare(config);
try {