Merge branch 'STORM-3407'
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 0051a8d..8434b1d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -334,6 +334,8 @@
 topology.worker.max.heap.size.mb: 768.0
 topology.scheduler.strategy: "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy"
 resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
+topology.ras.constraint.max.state.search: 10_000     # The maximum number of states that will be searched looking for a solution in the constraint solver strategy
+resource.aware.scheduler.constraint.max.state.search: 100_000 # Daemon limit on maximum number of states that will be searched looking for a solution in the constraint solver strategy
 
 blacklist.scheduler.tolerance.time.secs: 300
 blacklist.scheduler.tolerance.count: 3
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 10a52b8..e86e165 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -315,6 +315,13 @@
     @isPositiveNumber
     public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH = "topology.ras.constraint.max.state.search";
     /**
+     * The maximum number of states that will be searched looking for a solution in the constraint solver strategy.
+     * Backward compatibility config value for old topologies
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_TRAVERSAL = "topology.ras.constraint.max.state.traversal";
+    /**
      * The maximum number of seconds to spend scheduling a topology using the constraint solver.  Null means no limit.
      */
     @isInteger
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index a974e46..bbd59c2 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -936,6 +936,13 @@
     public static final String RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS =
         "resource.aware.scheduler.max.topology.scheduling.attempts";
 
+    /*
+     * The maximum number of states that will be searched looking for a solution in the constraint solver strategy
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH = "resource.aware.scheduler.constraint.max.state.search";
+
     /**
      * How often nimbus's background thread to sync code for missing topologies should run.
      */
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
index 64d5acb..0a6b279 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
@@ -25,6 +25,7 @@
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.ExecutorDetails;
 import org.apache.storm.scheduler.SchedulerAssignment;
@@ -42,8 +43,6 @@
 
 public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
     //hard coded max number of states to search
-    public static final int MAX_STATE_SEARCH = 100_000;
-    public static final int DEFAULT_STATE_SEARCH = 10_000;
     private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
 
     //constraints and spreads
@@ -248,10 +247,22 @@
         nodes = RAS_Nodes.getAllNodesFrom(cluster);
         Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
         Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
-        //set max number of states to search
-        final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
-                                            ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH),
-                                                                DEFAULT_STATE_SEARCH));
+
+        //set max number of states to search maintaining backward compatibility for old topologies
+        String stormVersionString = td.getTopology().get_storm_version();
+        boolean is2xTopology = stormVersionString != null && stormVersionString.startsWith("2");
+
+        Object confMaxStateSearch = null;
+        if (is2xTopology == false) {
+            //backward compatibility
+            confMaxStateSearch = td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_TRAVERSAL);
+        }
+        if (confMaxStateSearch == null) {
+            //new topology or old topology using new config
+            confMaxStateSearch = td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH);
+        }
+        int daemonMaxStateSearch = ObjectReader.getInt(td.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH));
+        final int maxStateSearch = Math.min(daemonMaxStateSearch, ObjectReader.getInt(confMaxStateSearch));
 
         final long maxTimeMs =
             ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1).intValue() * 1000L;
@@ -290,7 +301,7 @@
         }
 
         //early detection/early fail
-        if (!checkSchedulingFeasibility()) {
+        if (!checkSchedulingFeasibility(maxStateSearch)) {
             //Scheduling Status set to FAIL_OTHER so no eviction policy will be attempted to make space for this topology
             return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, "Scheduling not feasible!");
         }
@@ -298,7 +309,7 @@
             .asSchedulingResult();
     }
 
-    private boolean checkSchedulingFeasibility() {
+    private boolean checkSchedulingFeasibility(int maxStateSearch) {
         for (String comp : spreadComps) {
             int numExecs = compToExecs.get(comp).size();
             if (numExecs > nodes.size()) {
@@ -307,9 +318,9 @@
                 return false;
             }
         }
-        if (execToComp.size() >= MAX_STATE_SEARCH) {
+        if (execToComp.size() >= maxStateSearch) {
             LOG.error("Number of executors is greater than the maximum number of states allowed to be searched.  "
-                      + "# of executors: {} Max states to search: {}", execToComp.size(), MAX_STATE_SEARCH);
+                      + "# of executors: {} Max states to search: {}", execToComp.size(), maxStateSearch);
             return false;
         }
         return true;
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index d609bb6..2bad23a 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -70,6 +70,7 @@
         spread.add("spout-0");
 
         Map<String, Object> config = Utils.readDefaultConfig();
+        config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
         config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread);
         config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
         config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
@@ -206,6 +207,7 @@
 
         Map<String, Object> config = Utils.readDefaultConfig();
         config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName());
+        config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
         config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, ConstraintSolverStrategy.class.getName());
         config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread);
         config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);