[STORM-3519] Change ConstraintSolverStrategy::backtrackSearch to iteration (#3136)

diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
index cacba7a..81994ba 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
@@ -320,51 +320,89 @@
         return GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc);
     }
 
-    // Backtracking algorithm does not take into account the ordering of executors in worker to reduce traversal space
+    /**
+     * Try to schedule till successful or till limits (backtrack count or time) have been exceeded.
+     *
+     * @param state terminal state of the executor assignment.
+     * @return SolverResult with success attribute set to true or false indicting whether ALL executors were assigned.
+     */
     @VisibleForTesting
     protected SolverResult backtrackSearch(SearcherState state) {
-        state.incStatesSearched();
-        if (state.areSearchLimitsExceeded()) {
-            LOG.warn("Limits Exceeded");
-            return new SolverResult(state, false);
+        long         startTimeMilli     = System.currentTimeMillis();
+        int          maxExecCnt         = state.getExecSize();
+
+        // following three are state information at each "execIndex" level
+        int[]        progressIdxForExec = new int[maxExecCnt];
+        RasNode[]    nodeForExec        = new RasNode[maxExecCnt];
+        WorkerSlot[] workerSlotForExec  = new WorkerSlot[maxExecCnt];
+
+        for (int i = 0; i < maxExecCnt ; i++) {
+            progressIdxForExec[i] = -1;
         }
+        LOG.info("backtrackSearch: will assign {} executors", maxExecCnt);
 
-        if (Thread.currentThread().isInterrupted()) {
-            return new SolverResult(state, false);
-        }
+        OUTERMOST_LOOP:
+        for (int loopCnt = 0 ; true ; loopCnt++) {
+            LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex = {}", loopCnt, state.execIndex);
+            if (state.areSearchLimitsExceeded()) {
+                LOG.warn("backtrackSearch: Search limits exceeded");
+                return new SolverResult(state, false);
+            }
 
-        ExecutorDetails exec = state.currentExec();
-        Iterable<String> sortedNodes = sortAllNodes(state.td, exec, favoredNodeIds, unFavoredNodeIds);
+            if (Thread.currentThread().isInterrupted()) {
+                return new SolverResult(state, false);
+            }
 
-        for (String nodeId: sortedNodes) {
-            RasNode node = nodes.get(nodeId);
-            for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
-                if (isExecAssignmentToWorkerValid(workerSlot, state)) {
+            int execIndex = state.execIndex;
+
+            ExecutorDetails exec = state.currentExec();
+            Iterable<String> sortedNodesIter = sortAllNodes(state.td, exec, favoredNodeIds, unFavoredNodeIds);
+
+            int progressIdx = -1;
+            for (String nodeId : sortedNodesIter) {
+                RasNode node = nodes.get(nodeId);
+                for (WorkerSlot workerSlot : node.getSlotsAvailableToScheduleOn()) {
+                    progressIdx++;
+                    if (progressIdx <= progressIdxForExec[execIndex]) {
+                        continue;
+                    }
+                    progressIdxForExec[execIndex]++;
+                    LOG.debug("backtrackSearch: loopCnt = {}, state.execIndex = {}, node/slot-ordinal = {}, nodeId = {}",
+                        loopCnt, execIndex, progressIdx, nodeId);
+
+                    if (!isExecAssignmentToWorkerValid(workerSlot, state)) {
+                        continue;
+                    }
+
+                    state.incStatesSearched();
                     state.tryToSchedule(execToComp, node, workerSlot);
-
                     if (state.areAllExecsScheduled()) {
                         //Everything is scheduled correctly, so no need to search any more.
+                        LOG.info("backtrackSearch: AllExecsScheduled at loopCnt = {} in {} milliseconds, elapsedtime in state={}",
+                            loopCnt, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis);
                         return new SolverResult(state, true);
                     }
-
-                    SolverResult results = backtrackSearch(state.nextExecutor());
-                    if (results.success) {
-                        //We found a good result we are done.
-                        return results;
-                    }
-
-                    if (state.areSearchLimitsExceeded()) {
-                        //No need to search more it is not going to help.
-                        return new SolverResult(state, false);
-                    }
-
-                    //backtracking (If we ever get here there really isn't a lot of hope that we will find a scheduling)
-                    state.backtrack(execToComp, node, workerSlot);
+                    state = state.nextExecutor();
+                    nodeForExec[execIndex] = node;
+                    workerSlotForExec[execIndex] = workerSlot;
+                    LOG.debug("backtrackSearch: Assigned execId={} to node={}, node/slot-ordinal={} at loopCnt={}",
+                        execIndex, nodeId, progressIdx, loopCnt);
+                    continue OUTERMOST_LOOP;
                 }
             }
+            // if here, then the executor was not assigned, backtrack;
+            LOG.debug("backtrackSearch: Failed to schedule execId = {} at loopCnt = {}", execIndex, loopCnt);
+            if (execIndex == 0) {
+                break;
+            } else {
+                state.backtrack(execToComp, nodeForExec[execIndex - 1], workerSlotForExec[execIndex - 1]);
+                progressIdxForExec[execIndex] = -1;
+            }
         }
-        //Tried all of the slots and none of them worked.
-        return new SolverResult(state, false);
+        boolean success = state.areAllExecsScheduled();
+        LOG.info("backtrackSearch: Scheduled={} in {} milliseconds, elapsedtime in state={}",
+            success, System.currentTimeMillis() - startTimeMilli, Time.currentTimeMillis() - state.startTimeMillis);
+        return new SolverResult(state, success);
     }
 
     /**
@@ -531,6 +569,10 @@
             return statesSearched;
         }
 
+        public int getExecSize() {
+            return execs.size();
+        }
+
         public boolean areSearchLimitsExceeded() {
             return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs;
         }
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index f43868d..a46f99e 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -195,12 +195,12 @@
                 @Override
                 protected SolverResult backtrackSearch(SearcherState state) {
                     //Each time we try to schedule a new component simulate taking 1 second longer
-                    Time.advanceTime(1_000);
+                    Time.advanceTime(1_001);
                     return super.backtrackSearch(state);
 
                 }
             };
-            basicFailureTest(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS, 2, cs);
+            basicFailureTest(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS, 1, cs);
         }
     }
 
@@ -237,15 +237,9 @@
         scheduler.schedule(topologies, cluster);
 
         boolean scheduleSuccess = isStatusSuccess(cluster.getStatus(topo.getId()));
-
-        if (parallelismMultiplier == 1) {
-            Assert.assertTrue(scheduleSuccess);
-        } else if (parallelismMultiplier == 20) {
-            // For default JVM, scheduling currently fails due to StackOverflow.
-            // For now just log the results of the test. Change to assert when StackOverflow issue is fixed.
-            LOG.info("testScheduleLargeExecutorCount scheduling {} with {}x executor multiplier", scheduleSuccess ? "succeeds" : "fails",
-                    parallelismMultiplier);
-        }
+        LOG.info("testScheduleLargeExecutorCount scheduling {} with {}x executor multiplier", scheduleSuccess ? "succeeds" : "fails",
+                parallelismMultiplier);
+        Assert.assertTrue(scheduleSuccess);
     }
 
     @Test