TEZ-3696. Jobs can hang when both concurrency and speculation are enabled. Contributed by Eric Badger
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
index 87a6261..929f0c8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
@@ -18,21 +18,22 @@
package org.apache.tez.dag.app.dag;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Queue;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
public abstract class DAGScheduler {
private static class VertexInfo {
int concurrencyLimit;
int concurrency;
- Queue<DAGEventSchedulerUpdate> pendingAttempts = Lists.newLinkedList();
-
+ Map<TezTaskAttemptID, DAGEventSchedulerUpdate> pendingAttempts =
+ new LinkedHashMap<TezTaskAttemptID, DAGEventSchedulerUpdate>();
+
VertexInfo(int limit) {
this.concurrencyLimit = limit;
}
@@ -42,7 +43,7 @@
public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) {
if (vertexInfo == null) {
- vertexInfo = Maps.newHashMap();
+ vertexInfo = new HashMap<TezVertexID, VertexInfo>();
}
if (concurrency > 0) {
vertexInfo.put(vId, new VertexInfo(concurrency));
@@ -60,7 +61,7 @@
private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vInfo) {
if (vInfo != null) {
if (vInfo.concurrency >= vInfo.concurrencyLimit) {
- vInfo.pendingAttempts.add(event);
+ vInfo.pendingAttempts.put(event.getAttempt().getID(), event);
return; // already at max concurrency
}
vInfo.concurrency++;
@@ -73,9 +74,14 @@
if (vertexInfo != null) {
VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
if (vInfo != null) {
- vInfo.concurrency--;
- if (!vInfo.pendingAttempts.isEmpty()) {
- scheduleTaskWithLimit(vInfo.pendingAttempts.poll(), vInfo);
+ if(vInfo.pendingAttempts.remove(event.getAttempt().getID()) == null) {
+ vInfo.concurrency--;
+ if(!vInfo.pendingAttempts.isEmpty()) {
+ Iterator<DAGEventSchedulerUpdate> i = vInfo.pendingAttempts.values().iterator();
+ DAGEventSchedulerUpdate nextTaskAttempt = i.next();
+ i.remove();
+ scheduleTaskWithLimit(nextTaskAttempt, vInfo);
+ }
}
}
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 2dfd7f2..8d4106c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -963,9 +963,7 @@
private void handleTaskAttemptCompletion(TezTaskAttemptID attemptId,
TaskAttemptStateInternal attemptState) {
this.sendTaskAttemptCompletionEvent(attemptId, attemptState);
- if (getInternalState() != TaskStateInternal.SUCCEEDED) {
- sendDAGSchedulerFinishedEvent(attemptId); // not a retro active action
- }
+ sendDAGSchedulerFinishedEvent(attemptId);
}
private void sendDAGSchedulerFinishedEvent(TezTaskAttemptID taId) {
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
index f2fd933..f38f689 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -195,7 +195,61 @@
scheduled++;
}
-
-
-
+
+ @Test(timeout=5000)
+ public void testConcurrencyLimitWithKilledNonRunningTask() {
+ MockEventHandler mockEventHandler = new MockEventHandler();
+ DAG mockDag = mock(DAG.class);
+ when(mockDag.getTotalVertices()).thenReturn(2);
+ TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00");
+ TezTaskID tId0 = TezTaskID.getInstance(vId0, 0);
+
+ TaskAttempt mockAttempt;
+
+ Vertex mockVertex = mock(Vertex.class);
+ when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex);
+ when(mockVertex.getDistanceFromRoot()).thenReturn(0);
+ when(mockVertex.getVertexId()).thenReturn(vId0);
+
+ DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag,
+ mockEventHandler);
+
+ List<TaskAttempt> mockAttempts = Lists.newArrayList();
+ int completed = 0;
+ int requested = 0;
+ int scheduled = 0;
+ scheduler.addVertexConcurrencyLimit(vId0, 1); // effective
+
+ // schedule beyond limit and it gets buffered
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+ Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+ mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+ scheduled++;
+
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered
+
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered
+
+ scheduler.taskCompleted(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(1)));
+ Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered
+ Assert.assertEquals(mockAttempts.get(0).getID(),
+ mockEventHandler.events.get(0).getTaskAttemptID()); // matches order
+ }
+
}
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index b7a6d21..1236ced0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -781,6 +782,48 @@
}
@Test(timeout = 20000)
+ public void testKilledAttemptUpdatesDAGScheduler() {
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(firstAttempt.getID());
+ updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+ // Add a speculative task attempt
+ mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(specAttempt.getID());
+ updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+ assertEquals(2, mockTask.getAttemptList().size());
+
+ // Have the first task succeed
+ eventHandler.events.clear();
+ mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ verifyOutgoingEvents(eventHandler.events, DAGEventType.DAG_SCHEDULER_UPDATE,
+ VertexEventType.V_TASK_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED);
+
+ // The task should now have succeeded and sent kill to other attempt
+ assertTaskSucceededState();
+ verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
+ eq(firstAttempt.getID().getId()));
+ @SuppressWarnings("rawtypes")
+ Event event = eventHandler.events.get(eventHandler.events.size()-1);
+ assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, event.getType());
+ assertEquals(specAttempt.getID(),
+ ((TaskAttemptEventKillRequest) event).getTaskAttemptID());
+
+ eventHandler.events.clear();
+ // Emulate the spec attempt being killed
+ mockTask.handle(new TaskEventTAUpdate(specAttempt
+ .getID(), TaskEventType.T_ATTEMPT_KILLED));
+ assertTaskSucceededState();
+ verifyOutgoingEvents(eventHandler.events, DAGEventType.DAG_SCHEDULER_UPDATE,
+ VertexEventType.V_TASK_ATTEMPT_COMPLETED);
+ }
+
+ @Test(timeout = 20000)
public void testSpeculatedThenRetroactiveFailure() {
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);