Merge pull request #3098 from Ethanlm/STORM-3481

[STORM-3481] fix IllegalArgumentException for ConstraintSolverStrategy
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 2866b03..a2aa58a 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -316,13 +316,6 @@
     @IsPositiveNumber
     public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH = "topology.ras.constraint.max.state.search";
     /**
-     * The maximum number of states that will be searched looking for a solution in the constraint solver strategy.
-     * Backward compatibility config value for old topologies
-     */
-    @IsInteger
-    @IsPositiveNumber
-    public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_TRAVERSAL = "topology.ras.constraint.max.state.traversal";
-    /**
      * The maximum number of seconds to spend scheduling a topology using the constraint solver.  Null means no limit.
      */
     @IsInteger
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
index 0a6b279..9f8265d 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
@@ -248,24 +248,11 @@
         Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
         Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
 
-        //set max number of states to search maintaining backward compatibility for old topologies
-        String stormVersionString = td.getTopology().get_storm_version();
-        boolean is2xTopology = stormVersionString != null && stormVersionString.startsWith("2");
+        int confMaxStateSearch = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH));
+        int daemonMaxStateSearch = ObjectReader.getInt(cluster.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH));
+        final int maxStateSearch = Math.min(daemonMaxStateSearch, confMaxStateSearch);
 
-        Object confMaxStateSearch = null;
-        if (is2xTopology == false) {
-            //backward compatibility
-            confMaxStateSearch = td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_TRAVERSAL);
-        }
-        if (confMaxStateSearch == null) {
-            //new topology or old topology using new config
-            confMaxStateSearch = td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH);
-        }
-        int daemonMaxStateSearch = ObjectReader.getInt(td.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH));
-        final int maxStateSearch = Math.min(daemonMaxStateSearch, ObjectReader.getInt(confMaxStateSearch));
-
-        final long maxTimeMs =
-            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1).intValue() * 1000L;
+        final long maxTimeMs = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1) * 1000L;
 
         favoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES));
         unFavoredNodeIds = makeHostToNodeIds((List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index 2bad23a..4ae5c88 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -87,10 +87,16 @@
         return genTopology("testTopo", config, 1, 4, 4, boltParallel, 0, 0, "user");
     }
 
-    public Cluster makeCluster(TopologyDetails topo) {
-        Topologies topologies = new Topologies(topo);
-        Map<String, SupervisorDetails> supMap = genSupervisors(4, 2, 120, 1200);
-        return new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, new Config());
+    public Cluster makeCluster(Topologies topologies) {
+        return makeCluster(topologies, null);
+    }
+
+    public Cluster makeCluster(Topologies topologies, Map<String, SupervisorDetails> supMap) {
+        if (supMap == null) {
+            supMap = genSupervisors(4, 2, 120, 1200);
+        }
+        Map<String, Object> config = Utils.readDefaultConfig();
+        return new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
     }
 
     public void basicUnitTestWithKillAndRecover(ConstraintSolverStrategy cs, int boltParallel) {
@@ -98,7 +104,8 @@
         cs.prepare(config);
 
         TopologyDetails topo = makeTopology(config, boltParallel);
-        Cluster cluster = makeCluster(topo);
+        Topologies topologies = new Topologies(topo);
+        Cluster cluster = makeCluster(topologies);
 
         LOG.info("Scheduling...");
         SchedulingResult result = cs.schedule(cluster, topo);
@@ -164,7 +171,8 @@
         cs.prepare(config);
 
         TopologyDetails topo = makeTopology(config, NORMAL_BOLT_PARALLEL);
-        Cluster cluster = makeCluster(topo);
+        Topologies topologies = new Topologies(topo);
+        Cluster cluster = makeCluster(topologies);
 
         LOG.info("Scheduling...");
         SchedulingResult result = cs.schedule(cluster, topo);
@@ -196,8 +204,6 @@
 
     @Test
     public void testIntegrationWithRAS() {
-        Map<String, SupervisorDetails> supMap = genSupervisors(30, 16, 400, 1024 * 4);
-
         List<List<String>> constraints = new LinkedList<>();
         addContraints("spout-0", "bolt-0", constraints);
         addContraints("bolt-1", "bolt-1", constraints);
@@ -206,8 +212,6 @@
         spread.add("spout-0");
 
         Map<String, Object> config = Utils.readDefaultConfig();
-        config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName());
-        config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
         config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, ConstraintSolverStrategy.class.getName());
         config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread);
         config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
@@ -222,7 +226,8 @@
         Map<String, TopologyDetails> topoMap = new HashMap<>();
         topoMap.put(topo.getId(), topo);
         Topologies topologies = new Topologies(topoMap);
-        Cluster cluster = new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config);
+        Map<String, SupervisorDetails> supMap = genSupervisors(30, 16, 400, 1024 * 4);
+        Cluster cluster = makeCluster(topologies, supMap);
         ResourceAwareScheduler rs = new ResourceAwareScheduler();
         rs.prepare(config);
         try {