Merge branch 'STORM-3407'
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 0051a8d..8434b1d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -334,6 +334,8 @@
topology.worker.max.heap.size.mb: 768.0
topology.scheduler.strategy: "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy"
resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy"
+topology.ras.constraint.max.state.search: 10_000 # The maximum number of states that will be searched looking for a solution in the constraint solver strategy
+resource.aware.scheduler.constraint.max.state.search: 100_000 # Daemon limit on maximum number of states that will be searched looking for a solution in the constraint solver strategy
blacklist.scheduler.tolerance.time.secs: 300
blacklist.scheduler.tolerance.count: 3
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 10a52b8..e86e165 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -315,6 +315,13 @@
@isPositiveNumber
public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH = "topology.ras.constraint.max.state.search";
/**
+ * The maximum number of states that will be searched looking for a solution in the constraint solver strategy.
+ * Backward compatibility config value for old topologies
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_TRAVERSAL = "topology.ras.constraint.max.state.traversal";
+ /**
* The maximum number of seconds to spend scheduling a topology using the constraint solver. Null means no limit.
*/
@isInteger
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index a974e46..bbd59c2 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -936,6 +936,13 @@
public static final String RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS =
"resource.aware.scheduler.max.topology.scheduling.attempts";
+ /*
+ * The maximum number of states that will be searched looking for a solution in the constraint solver strategy
+ */
+ @isInteger
+ @isPositiveNumber
+ public static final String RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH = "resource.aware.scheduler.constraint.max.state.search";
+
/**
* How often nimbus's background thread to sync code for missing topologies should run.
*/
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
index 64d5acb..0a6b279 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
@@ -25,6 +25,7 @@
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SchedulerAssignment;
@@ -42,8 +43,6 @@
public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
//hard coded max number of states to search
- public static final int MAX_STATE_SEARCH = 100_000;
- public static final int DEFAULT_STATE_SEARCH = 10_000;
private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class);
//constraints and spreads
@@ -248,10 +247,22 @@
nodes = RAS_Nodes.getAllNodesFrom(cluster);
Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>();
Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
- //set max number of states to search
- final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
- ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH),
- DEFAULT_STATE_SEARCH));
+
+ //set max number of states to search maintaining backward compatibility for old topologies
+ String stormVersionString = td.getTopology().get_storm_version();
+ boolean is2xTopology = stormVersionString != null && stormVersionString.startsWith("2");
+
+ Object confMaxStateSearch = null;
+ if (is2xTopology == false) {
+ //backward compatibility
+ confMaxStateSearch = td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_TRAVERSAL);
+ }
+ if (confMaxStateSearch == null) {
+ //new topology or old topology using new config
+ confMaxStateSearch = td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH);
+ }
+ int daemonMaxStateSearch = ObjectReader.getInt(td.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH));
+ final int maxStateSearch = Math.min(daemonMaxStateSearch, ObjectReader.getInt(confMaxStateSearch));
final long maxTimeMs =
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1).intValue() * 1000L;
@@ -290,7 +301,7 @@
}
//early detection/early fail
- if (!checkSchedulingFeasibility()) {
+ if (!checkSchedulingFeasibility(maxStateSearch)) {
//Scheduling Status set to FAIL_OTHER so no eviction policy will be attempted to make space for this topology
return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, "Scheduling not feasible!");
}
@@ -298,7 +309,7 @@
.asSchedulingResult();
}
- private boolean checkSchedulingFeasibility() {
+ private boolean checkSchedulingFeasibility(int maxStateSearch) {
for (String comp : spreadComps) {
int numExecs = compToExecs.get(comp).size();
if (numExecs > nodes.size()) {
@@ -307,9 +318,9 @@
return false;
}
}
- if (execToComp.size() >= MAX_STATE_SEARCH) {
+ if (execToComp.size() >= maxStateSearch) {
LOG.error("Number of executors is greater than the maximum number of states allowed to be searched. "
- + "# of executors: {} Max states to search: {}", execToComp.size(), MAX_STATE_SEARCH);
+ + "# of executors: {} Max states to search: {}", execToComp.size(), maxStateSearch);
return false;
}
return true;
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index d609bb6..2bad23a 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -70,6 +70,7 @@
spread.add("spout-0");
Map<String, Object> config = Utils.readDefaultConfig();
+ config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread);
config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);
config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
@@ -206,6 +207,7 @@
Map<String, Object> config = Utils.readDefaultConfig();
config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH);
config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, ConstraintSolverStrategy.class.getName());
config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread);
config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints);