[STORM-3519] Change ConstraintSolverStrategy::backtrackSearch to iteration (#3136)
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
index cacba7a..81994ba 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
@@ -320,51 +320,89 @@
return GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc);
}
- // Backtracking algorithm does not take into account the ordering of executors in worker to reduce traversal space
+ /**
+ * Try to schedule till successful or till limits (backtrack count or time) have been exceeded.
+ *
+ * @param state terminal state of the executor assignment.
+ * @return SolverResult with success attribute set to true or false indicting whether ALL executors were assigned.
+ */
@VisibleForTesting
protected SolverResult backtrackSearch(SearcherState state) {
- state.incStatesSearched();
- if (state.areSearchLimitsExceeded()) {
- LOG.warn("Limits Exceeded");
- return new SolverResult(state, false);
+ long startTimeMilli = System.currentTimeMillis();
+ int maxExecCnt = state.getExecSize();
+
+ // following three are state information at each "execIndex" level
+ int[] progressIdxForExec = new int[maxExecCnt];
+ RasNode[] nodeForExec = new RasNode[maxExecCnt];
+ WorkerSlot[] workerSlotForExec = new WorkerSlot[maxExecCnt];
+
+ for (int i = 0; i < maxExecCnt ; i++) {
+ progressIdxForExec[i] = -1;
}
+ LOG.info("backtrackSearch: will assign {} executors", maxExecCnt);
- if (Thread.currentThread().isInterrupted()) {
- return new SolverResult(state, false);
- }
+ OUTERMOST_LOOP:
+ for (int loopCnt = 0 ; true ; loopCnt++) {
+ LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex = {}", loopCnt, state.execIndex);
+ if (state.areSearchLimitsExceeded()) {
+ LOG.warn("backtrackSearch: Search limits exceeded");
+ return new SolverResult(state, false);
+ }
- ExecutorDetails exec = state.currentExec();
- Iterable<String> sortedNodes = sortAllNodes(state.td, exec, favoredNodeIds, unFavoredNodeIds);
+ if (Thread.currentThread().isInterrupted()) {
+ return new SolverResult(state, false);
+ }
- for (String nodeId: sortedNodes) {
- RasNode node = nodes.get(nodeId);
- for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
- if (isExecAssignmentToWorkerValid(workerSlot, state)) {
+ int execIndex = state.execIndex;
+
+ ExecutorDetails exec = state.currentExec();
+ Iterable<String> sortedNodesIter = sortAllNodes(state.td, exec, favoredNodeIds, unFavoredNodeIds);
+
+ int progressIdx = -1;
+ for (String nodeId : sortedNodesIter) {
+ RasNode node = nodes.get(nodeId);
+ for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
+ progressIdx++;
+ if (progressIdx <= progressIdxForExec[execIndex]) {
+ continue;
+ }
+ progressIdxForExec[execIndex]++;
+ LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex = {}, node/slot-ordinal = {}, nodeId = {}",
+ loopCnt, execIndex, progressIdx, nodeId);
+
+ if (!isExecAssignmentToWorkerValid(workerSlot, state)) {
+ continue;
+ }
+
+ state.incStatesSearched();
state.tryToSchedule(execToComp, node, workerSlot);
-
if (state.areAllExecsScheduled()) {
//Everything is scheduled correctly, so no need to search any more.
+ LOG.info("backtrackSearch: AllExecsScheduled at loopCnt = {} in {} milliseconds, elapsedtime in state={}",
+ loopCnt, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis);
return new SolverResult(state, true);
}
-
- SolverResult results = backtrackSearch(state.nextExecutor());
- if (results.success) {
- //We found a good result we are done.
- return results;
- }
-
- if (state.areSearchLimitsExceeded()) {
- //No need to search more it is not going to help.
- return new SolverResult(state, false);
- }
-
- //backtracking (If we ever get here there really isn't a lot of hope that we will find a scheduling)
- state.backtrack(execToComp, node, workerSlot);
+ state = state.nextExecutor();
+ nodeForExec[execIndex] = node;
+ workerSlotForExec[execIndex] = workerSlot;
+ LOG.debug("backtrackSearch: Assigned execId={} to node={}, node/slot-ordinal={} at loopCnt={}",
+ execIndex, nodeId, progressIdx, loopCnt);
+ continue OUTERMOST_LOOP;
}
}
+ // if here, then the executor was not assigned, backtrack;
+ LOG.debug("backtrackSearch: Failed to schedule execId = {} at loopCnt = {}", execIndex, loopCnt);
+ if (execIndex == 0) {
+ break;
+ } else {
+ state.backtrack(execToComp, nodeForExec[execIndex - 1], workerSlotForExec[execIndex - 1]);
+ progressIdxForExec[execIndex] = -1;
+ }
}
- //Tried all of the slots and none of them worked.
- return new SolverResult(state, false);
+ boolean success = state.areAllExecsScheduled();
+ LOG.info("backtrackSearch: Scheduled={} in {} milliseconds, elapsedtime in state={}",
+ success, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis);
+ return new SolverResult(state, success);
}
/**
@@ -531,6 +569,10 @@
return statesSearched;
}
+ public int getExecSize() {
+ return execs.size();
+ }
+
public boolean areSearchLimitsExceeded() {
return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index f43868d..a46f99e 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -195,12 +195,12 @@
@Override
protected SolverResult backtrackSearch(SearcherState state) {
//Each time we try to schedule a new component simulate taking 1 second longer
- Time.advanceTime(1_000);
+ Time.advanceTime(1_001);
return super.backtrackSearch(state);
}
};
- basicFailureTest(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS, 2, cs);
+ basicFailureTest(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS, 1, cs);
}
}
@@ -237,15 +237,9 @@
scheduler.schedule(topologies, cluster);
boolean scheduleSuccess = isStatusSuccess(cluster.getStatus(topo.getId()));
-
- if (parallelismMultiplier == 1) {
- Assert.assertTrue(scheduleSuccess);
- } else if (parallelismMultiplier == 20) {
- // For default JVM, scheduling currently fails due to StackOverflow.
- // For now just log the results of the test. Change to assert when StackOverflow issue is fixed.
- LOG.info("testScheduleLargeExecutorCount scheduling {} with {}x executor multiplier", scheduleSuccess ? "succeeds" : "fails",
- parallelismMultiplier);
- }
+ LOG.info("testScheduleLargeExecutorCount scheduling {} with {}x executor multiplier", scheduleSuccess ? "succeeds" : "fails",
+ parallelismMultiplier);
+ Assert.assertTrue(scheduleSuccess);
}
@Test