TEZ-4062. Speculative attempt scheduling should be aborted when Task has completed
Signed-off-by: Jonathan Eagles <jeagles@apache.org>
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 9289d8f..e563fe9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1030,8 +1030,19 @@
if (!ta.isFinished()) {
earliestUnfinishedAttempt = ta;
task.nodesWithRunningAttempts.add(ta.getNodeId());
+ } else {
+ if (TaskAttemptState.SUCCEEDED.equals(ta.getState())) {
+ LOG.info("Ignore speculation scheduling for task {} since it has succeeded with attempt {}.",
+ task.getTaskId(), ta.getID());
+ return;
+ }
}
}
+ if (earliestUnfinishedAttempt == null) {
+ // no running (or SUCCEEDED) task attempt at this moment, no need to schedule speculative attempt either
+ LOG.info("Ignore speculation scheduling since there is no running attempt on task {}.", task.getTaskId());
+ return;
+ }
task.addAndScheduleAttempt(earliestUnfinishedAttempt.getID());
}
}
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 81cd675..2d4adcc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -982,6 +982,23 @@
Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
}
+ @Test(timeout = 20000)
+ public void testIgnoreSpeculationOnSuccessfulOriginalAttempt() {
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(firstAttempt.getID());
+ // Mock success of the first task attempt
+ updateAttemptState(firstAttempt, TaskAttemptState.SUCCEEDED);
+ firstAttempt.handle(new TaskAttemptEvent(firstAttempt.getID(), TaskAttemptEventType.TA_DONE));
+
+ // Verify the speculation scheduling is ignored and no speculative attempt was added to the task
+ mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID()));
+ MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(specAttempt.getID());
+ assertEquals(1, mockTask.getAttemptList().size());
+ }
+
@SuppressWarnings("rawtypes")
@Test
public void testSucceededAttemptStatusWithRetroActiveFailures() throws InterruptedException {