Respect Maximum Number Of Attempts for the tasks (#1142)

In this commit, several scheduling parts have been changed in order to
enforce the scheduler to respect maximum number of attempts for
the tasks.

Also, it has been observed that when a task being dropped and
scheduled again, max number of attempts is not being respected.
in this commit, further checks are added to avoid schedule the
tasks again once we reach its maximum number of attempts.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index 1032417..d5bc11e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -58,7 +58,7 @@
 
   // For detecting live instance and target resource partition state change in task assignment
   // Used in AbstractTaskDispatcher
-  private boolean _existsLiveInstanceOrCurrentStateChange = false;
+  private boolean _existsLiveInstanceOrCurrentStateOrMessageChange = false;
 
   public WorkflowControllerDataProvider() {
     this(AbstractDataCache.UNKNOWN_CLUSTER);
@@ -71,12 +71,14 @@
   }
 
   private void refreshClusterStateChangeFlags(Set<HelixConstants.ChangeType> propertyRefreshed) {
-    // This is for targeted jobs' task assignment. It needs to watch for current state changes for
-    // when targeted resources' state transitions complete
-    _existsLiveInstanceOrCurrentStateChange =
+    // This is for targeted jobs' task assignment. It needs to watch for current state or message
+    // changes for when targeted resources' state transitions complete
+    _existsLiveInstanceOrCurrentStateOrMessageChange =
         // TODO read and update CURRENT_STATE in the BaseControllerDataProvider as well.
-        // This check (and set) is necessary for now since the current state flag in _propertyDataChangedMap is not used by the BaseControllerDataProvider for now.
+        // This check (and set) is necessary for now since the current state flag in
+        // _propertyDataChangedMap is not used by the BaseControllerDataProvider for now.
         _propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE).getAndSet(false)
+            || _propertyDataChangedMap.get(HelixConstants.ChangeType.MESSAGE).getAndSet(false)
             || propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE)
             || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE);
   }
@@ -119,7 +121,7 @@
   }
 
   public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
-    _existsLiveInstanceOrCurrentStateChange = true;
+    _existsLiveInstanceOrCurrentStateOrMessageChange = true;
     super.setLiveInstances(liveInstances);
   }
 
@@ -257,8 +259,8 @@
    * task-assigning in AbstractTaskDispatcher.
    * @return
    */
-  public boolean getExistsLiveInstanceOrCurrentStateChange() {
-    return _existsLiveInstanceOrCurrentStateChange;
+  public boolean getExistsLiveInstanceOrCurrentStateOrMessageChange() {
+    return _existsLiveInstanceOrCurrentStateOrMessageChange;
   }
 
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index ffbdcef..904ecbd 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -20,6 +20,7 @@
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashSet;
@@ -74,6 +75,12 @@
       Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache,
       Map<String, Set<Integer>> tasksToDrop) {
 
+    // If a job is in one of the following states and its tasks are in RUNNING states, the tasks
+    // will be aborted.
+    Set<TaskState> jobStatesForAbortingTasks =
+        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
+            TaskState.FAILED, TaskState.ABORTED));
+
     // Get AssignableInstanceMap for releasing resources for tasks in terminal states
     AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
 
@@ -185,17 +192,11 @@
         switch (currState) {
         case RUNNING: {
           TaskPartitionState nextState = TaskPartitionState.RUNNING;
-          if (jobState == TaskState.TIMING_OUT) {
+          if (jobStatesForAbortingTasks.contains(jobState)) {
             nextState = TaskPartitionState.TASK_ABORTED;
           } else if (jobTgtState == TargetState.STOP) {
             nextState = TaskPartitionState.STOPPED;
-          } else if (jobState == TaskState.ABORTED || jobState == TaskState.FAILED
-              || jobState == TaskState.FAILING || jobState == TaskState.TIMED_OUT) {
-            // Drop tasks if parent job is not in progress
-            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
-            break;
           }
-
           paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
           assignedPartitions.get(instance).add(pId);
           if (LOG.isDebugEnabled()) {
@@ -548,8 +549,8 @@
       Set<Integer> allPartitions, final long currentTime, Collection<String> liveInstances) {
 
     // See if there was LiveInstance change and cache LiveInstances from this iteration of pipeline
-    boolean existsLiveInstanceOrCurrentStateChange =
-        cache.getExistsLiveInstanceOrCurrentStateChange();
+    boolean existsLiveInstanceOrCurrentStateOrMessageChangeChange =
+        cache.getExistsLiveInstanceOrCurrentStateOrMessageChange();
 
     // The excludeSet contains the set of task partitions that must be excluded from consideration
     // when making any new assignments.
@@ -560,7 +561,7 @@
       excludeSet.addAll(assignedSet);
     }
     addCompletedTasks(excludeSet, jobCtx, allPartitions);
-    addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg);
+    addPartitionsReachedMaximumRetries(excludeSet, jobCtx, allPartitions, jobCfg);
     excludeSet.addAll(skippedPartitions);
     Set<Integer> partitionsWithDelay = TaskUtil.getNonReadyPartitions(jobCtx, currentTime);
     excludeSet.addAll(partitionsWithDelay);
@@ -576,7 +577,8 @@
     Set<Integer> partitionsToRetryOnLiveInstanceChangeForTargetedJob = new HashSet<>();
     // If the job is a targeted job, in case of live instance change, we need to assign
     // non-terminal tasks so that they could be re-scheduled
-    if (!TaskUtil.isGenericTaskJob(jobCfg) && existsLiveInstanceOrCurrentStateChange) {
+    if (!TaskUtil.isGenericTaskJob(jobCfg)
+        && existsLiveInstanceOrCurrentStateOrMessageChangeChange) {
       // This job is a targeted job, so FixedAssignmentCalculator will be used
       // There has been a live instance change. Must re-add incomplete task partitions to be
       // re-assigned and re-scheduled
@@ -612,7 +614,8 @@
     }
 
     // If this is a targeted job and if there was a live instance change
-    if (!TaskUtil.isGenericTaskJob(jobCfg) && existsLiveInstanceOrCurrentStateChange) {
+    if (!TaskUtil.isGenericTaskJob(jobCfg)
+        && existsLiveInstanceOrCurrentStateOrMessageChangeChange) {
       // Drop current jobs only if they are assigned to a different instance, regardless of
       // the jobCfg.isRebalanceRunningTask() setting
       dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments, paMap,
@@ -745,8 +748,12 @@
     }
   }
 
-  // add all partitions that have been tried maxNumberAttempts
-  protected static void addGiveupPartitions(Set<Integer> set, JobContext ctx,
+  // Add all partitions/tasks that are cannot be retried. These tasks are:
+  // 1- Task is in ABORTED or ERROR state.
+  // 2- Task has just gone to TIMED_OUT, ERROR or DROPPED states and has reached to its
+  // maxNumberAttempts
+  // These tasks determine whether the job needs to FAILED or not.
+  protected static void addGivenUpPartitions(Set<Integer> set, JobContext ctx,
       Iterable<Integer> pIds, JobConfig cfg) {
     for (Integer pId : pIds) {
       if (isTaskGivenup(ctx, cfg, pId)) {
@@ -755,6 +762,17 @@
     }
   }
 
+  // Add all partitions that have reached their maxNumberAttempts. These tasks should not be
+  // considered for scheduling again.
+  protected static void addPartitionsReachedMaximumRetries(Set<Integer> set, JobContext ctx,
+      Iterable<Integer> pIds, JobConfig cfg) {
+    for (Integer pId : pIds) {
+      if (ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask()) {
+        set.add(pId);
+      }
+    }
+  }
+
   private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
       Set<Integer> excluded, Set<Integer> throttled, int n) {
     List<Integer> result = new ArrayList<>();
@@ -829,7 +847,8 @@
     if (state == TaskPartitionState.TASK_ABORTED || state == TaskPartitionState.ERROR) {
       return true;
     }
-    if (state == TaskPartitionState.TIMED_OUT || state == TaskPartitionState.TASK_ERROR) {
+    if (state == TaskPartitionState.TIMED_OUT || state == TaskPartitionState.TASK_ERROR
+        || state == TaskPartitionState.DROPPED) {
       return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask();
     }
     return false;
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index c2b724b..b10eb5e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -245,7 +245,7 @@
         jobResource, currStateOutput, jobCtx, jobCfg, jobState, assignedPartitions,
         partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache, tasksToDrop);
 
-    addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
+    addGivenUpPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
 
     if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold()
         || (jobCfg.getTargetResource() != null
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java
index 2a12568..ae724f0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestForceDeleteWorkflow.java
@@ -304,17 +304,17 @@
     //             JOB1 JOB2
 
     JobConfig.Builder jobBuilder0 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setTimeoutPerTask(LONG_TIMEOUT).setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setTimeoutPerTask(LONG_TIMEOUT).setWorkflow(workflowName)
         .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, executionTime,
             DelayedStopTask.JOB_DELAY_CANCEL, stopDelay));
 
     JobConfig.Builder jobBuilder1 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setTimeoutPerTask(LONG_TIMEOUT).setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setTimeoutPerTask(LONG_TIMEOUT).setWorkflow(workflowName)
         .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, executionTime,
             DelayedStopTask.JOB_DELAY_CANCEL, stopDelay));
 
     JobConfig.Builder jobBuilder2 = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setTimeoutPerTask(LONG_TIMEOUT).setMaxAttemptsPerTask(1).setWorkflow(workflowName)
+        .setTimeoutPerTask(LONG_TIMEOUT).setWorkflow(workflowName)
         .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, executionTime,
             DelayedStopTask.JOB_DELAY_CANCEL, stopDelay));
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestMaxNumberOfAttemptsMasterSwitch.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestMaxNumberOfAttemptsMasterSwitch.java
new file mode 100644
index 0000000..8683c3f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestMaxNumberOfAttemptsMasterSwitch.java
@@ -0,0 +1,152 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+/**
+ * Test to check is maximum number of attempts being respected while target partition is switching
+ * continuously.
+ */
+public class TestMaxNumberOfAttemptsMasterSwitch extends TaskTestBase {
+  private static final String DATABASE = WorkflowGenerator.DEFAULT_TGT_DB;
+  protected HelixDataAccessor _accessor;
+  private List<String> _assignmentList1;
+  private List<String> _assignmentList2;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numPartitions = 1;
+    _numNodes = 3;
+    super.beforeClass();
+    _driver = new TaskDriver(_manager);
+
+    // Assignment1: localhost_12918: Master, localhost_12919:Slave, localhost_12920: Slave
+    _assignmentList1 = new ArrayList<>();
+    _assignmentList1.add(PARTICIPANT_PREFIX + "_" + (_startPort + 0));
+    _assignmentList1.add(PARTICIPANT_PREFIX + "_" + (_startPort + 1));
+    _assignmentList1.add(PARTICIPANT_PREFIX + "_" + (_startPort + 2));
+
+    // Assignment2: localhost_12919: Master, localhost_12918:Slave, localhost_12920: Slave
+    _assignmentList2 = new ArrayList<>();
+    _assignmentList2.add(PARTICIPANT_PREFIX + "_" + (_startPort + 1));
+    _assignmentList2.add(PARTICIPANT_PREFIX + "_" + (_startPort + 0));
+    _assignmentList2.add(PARTICIPANT_PREFIX + "_" + (_startPort + 2));
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    super.afterClass();
+  }
+
+  @Test
+  public void testMaxNumberOfAttemptsMasterSwitch() throws Exception {
+    String jobQueueName = TestHelper.getTestMethodName();
+    int maxNumberOfAttempts = 5;
+    assignCustomizedIdealState(_assignmentList1);
+
+    JobConfig.Builder jobBuilder0 =
+        new JobConfig.Builder().setWorkflow(jobQueueName).setTargetResource(DATABASE)
+            .setTargetPartitionStates(Sets.newHashSet(MasterSlaveSMD.States.MASTER.name()))
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(maxNumberOfAttempts)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100000"));
+
+    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
+    jobQueue.enqueueJob("JOB0", jobBuilder0);
+    String nameSpacedJobName = TaskUtil.getNamespacedJobName(jobQueueName, "JOB0");
+
+    _driver.start(jobQueue.build());
+    _driver.pollForJobState(jobQueueName, nameSpacedJobName, TaskState.IN_PROGRESS);
+    boolean isAssignmentInIdealState = true;
+
+    // Turn on and off the instance (10 times) and make sure task gets retried and number of
+    // attempts gets incremented every time.
+    // Also make sure that the task won't be retried more than maxNumberOfAttempts
+    for (int i = 1; i <= 2 * maxNumberOfAttempts; i++) {
+      int expectedRetryNumber = Math.min(i, maxNumberOfAttempts);
+      Assert
+          .assertTrue(
+              TestHelper.verify(
+                  () -> (_driver.getJobContext(nameSpacedJobName)
+                      .getPartitionNumAttempts(0) == expectedRetryNumber),
+                  TestHelper.WAIT_DURATION));
+      if (isAssignmentInIdealState) {
+        assignCustomizedIdealState(_assignmentList2);
+        verifyMastership(_assignmentList2);
+        isAssignmentInIdealState = false;
+      } else {
+        assignCustomizedIdealState(_assignmentList1);
+        verifyMastership(_assignmentList1);
+        isAssignmentInIdealState = true;
+      }
+    }
+
+    // Since the task reaches max number of attempts, ths job will fails.
+    _driver.pollForJobState(jobQueueName, nameSpacedJobName, TaskState.FAILED);
+    Assert.assertEquals(_driver.getJobContext(nameSpacedJobName).getPartitionNumAttempts(0),
+        maxNumberOfAttempts);
+  }
+
+  private void assignCustomizedIdealState(List<String> _assignmentList) {
+    IdealState idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, DATABASE);
+    idealState.setPartitionState(DATABASE + "_0", _assignmentList.get(0), "MASTER");
+    idealState.setPartitionState(DATABASE + "_0", _assignmentList.get(1), "SLAVE");
+    idealState.setPartitionState(DATABASE + "_0", _assignmentList.get(2), "SLAVE");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, DATABASE,
+        idealState);
+  }
+
+  private void verifyMastership(List<String> _assignmentList) throws Exception {
+    String instance = _assignmentList.get(0);
+    boolean isMasterSwitchedToCorrectInstance = TestHelper.verify(() -> {
+      ExternalView externalView =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, DATABASE);
+      if (externalView == null) {
+        return false;
+      }
+      Map<String, String> stateMap = externalView.getStateMap(DATABASE + "_0");
+      if (stateMap == null) {
+        return false;
+      }
+      return "MASTER".equals(stateMap.get(instance));
+    }, TestHelper.WAIT_DURATION);
+    Assert.assertTrue(isMasterSwitchedToCorrectInstance);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
index 25cab50..08dc776 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java
@@ -58,9 +58,9 @@
     stopTestSetup(5);
 
     String jobQueueName = TestHelper.getTestMethodName();
-    JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
-        .setMaxAttemptsPerTask(1).setWorkflow(jobQueueName)
-        .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1"));
+    JobConfig.Builder jobBuilder =
+        JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG).setWorkflow(jobQueueName)
+            .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1"));
 
     JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
     jobQueue.enqueueJob("job1_will_succeed", jobBuilder);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
index b79dcb9..e849be2 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
@@ -93,7 +93,7 @@
     _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
         _liveInstances, _instanceConfigs);
     when(mock._cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
-    when(mock._cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(true);
+    when(mock._cache.getExistsLiveInstanceOrCurrentStateOrMessageChange()).thenReturn(true);
     Set<String> inflightJobDag = new HashSet<>();
     inflightJobDag.add(JOB_NAME);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())
@@ -130,7 +130,7 @@
     _assignableInstanceManager.buildAssignableInstances(_clusterConfig, mock._taskDataCache,
         _liveInstances, _instanceConfigs);
     when(mock._cache.getAssignableInstanceManager()).thenReturn(_assignableInstanceManager);
-    when(mock._cache.getExistsLiveInstanceOrCurrentStateChange()).thenReturn(false);
+    when(mock._cache.getExistsLiveInstanceOrCurrentStateOrMessageChange()).thenReturn(false);
     Set<String> inflightJobDag = new HashSet<>();
     inflightJobDag.add(JOB_NAME);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME).getInflightJobList())