TEZ-3696. Jobs can hang when both concurrency and speculation are enabled. Contributed by Eric Badger
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
index 87a6261..929f0c8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
@@ -18,21 +18,22 @@
 
 package org.apache.tez.dag.app.dag;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Queue;
 
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 public abstract class DAGScheduler {
   private static class VertexInfo {
     int concurrencyLimit;
     int concurrency;
-    Queue<DAGEventSchedulerUpdate> pendingAttempts = Lists.newLinkedList();
-    
+    Map<TezTaskAttemptID, DAGEventSchedulerUpdate> pendingAttempts =
+        new LinkedHashMap<TezTaskAttemptID, DAGEventSchedulerUpdate>();
+
     VertexInfo(int limit) {
       this.concurrencyLimit = limit;
     }
@@ -42,7 +43,7 @@
   
   public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) {
     if (vertexInfo == null) {
-      vertexInfo = Maps.newHashMap();
+      vertexInfo = new HashMap<TezVertexID, VertexInfo>();
     }
     if (concurrency > 0) {
       vertexInfo.put(vId, new VertexInfo(concurrency));
@@ -60,7 +61,7 @@
   private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vInfo) {
     if (vInfo != null) {
       if (vInfo.concurrency >= vInfo.concurrencyLimit) {
-        vInfo.pendingAttempts.add(event);
+        vInfo.pendingAttempts.put(event.getAttempt().getID(), event);
         return; // already at max concurrency
       }
       vInfo.concurrency++;
@@ -73,9 +74,14 @@
     if (vertexInfo != null) {
       VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
       if (vInfo != null) {
-        vInfo.concurrency--;
-        if (!vInfo.pendingAttempts.isEmpty()) {
-          scheduleTaskWithLimit(vInfo.pendingAttempts.poll(), vInfo);
+        if(vInfo.pendingAttempts.remove(event.getAttempt().getID()) == null) {
+          vInfo.concurrency--;
+          if(!vInfo.pendingAttempts.isEmpty()) {
+            Iterator<DAGEventSchedulerUpdate> i = vInfo.pendingAttempts.values().iterator();
+            DAGEventSchedulerUpdate nextTaskAttempt = i.next();
+            i.remove();
+            scheduleTaskWithLimit(nextTaskAttempt, vInfo);
+          }
         }
       }
     }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 2dfd7f2..8d4106c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -963,9 +963,7 @@
   private void handleTaskAttemptCompletion(TezTaskAttemptID attemptId,
       TaskAttemptStateInternal attemptState) {
     this.sendTaskAttemptCompletionEvent(attemptId, attemptState);
-    if (getInternalState() != TaskStateInternal.SUCCEEDED) {
-      sendDAGSchedulerFinishedEvent(attemptId); // not a retro active action
-    }
+    sendDAGSchedulerFinishedEvent(attemptId);
   }
 
   private void sendDAGSchedulerFinishedEvent(TezTaskAttemptID taId) {
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
index f2fd933..f38f689 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -195,7 +195,61 @@
     scheduled++;
 
   }
-  
-  
-  
+
+  @Test(timeout=5000)
+  public void testConcurrencyLimitWithKilledNonRunningTask() {
+    MockEventHandler mockEventHandler = new MockEventHandler();
+    DAG mockDag = mock(DAG.class);
+    when(mockDag.getTotalVertices()).thenReturn(2);
+    TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00");
+    TezTaskID tId0 = TezTaskID.getInstance(vId0, 0);
+
+    TaskAttempt mockAttempt;
+
+    Vertex mockVertex = mock(Vertex.class);
+    when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex);
+    when(mockVertex.getDistanceFromRoot()).thenReturn(0);
+    when(mockVertex.getVertexId()).thenReturn(vId0);
+
+    DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag,
+        mockEventHandler);
+
+    List<TaskAttempt> mockAttempts = Lists.newArrayList();
+    int completed = 0;
+    int requested = 0;
+    int scheduled = 0;
+    scheduler.addVertexConcurrencyLimit(vId0, 1); // effective
+
+    // schedule beyond limit and it gets buffered
+    mockAttempt = mock(TaskAttempt.class);
+    mockAttempts.add(mockAttempt);
+    when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++));
+    scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+    Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+    Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+        mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+    scheduled++;
+
+    mockAttempt = mock(TaskAttempt.class);
+    mockAttempts.add(mockAttempt);
+    when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++));
+    scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+    Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered
+
+    mockAttempt = mock(TaskAttempt.class);
+    mockAttempts.add(mockAttempt);
+    when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++));
+    scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+    Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered
+
+    scheduler.taskCompleted(new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(1)));
+    Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered
+    Assert.assertEquals(mockAttempts.get(0).getID(),
+        mockEventHandler.events.get(0).getTaskAttemptID()); // matches order
+  }
+
 }
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index b7a6d21..1236ced0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -31,6 +31,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -781,6 +782,48 @@
   }
 
   @Test(timeout = 20000)
+  public void testKilledAttemptUpdatesDAGScheduler() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(firstAttempt.getID());
+    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+    // Add a speculative task attempt
+    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+      TaskEventType.T_ADD_SPEC_ATTEMPT));
+    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(specAttempt.getID());
+    updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+    assertEquals(2, mockTask.getAttemptList().size());
+
+    // Have the first task succeed
+    eventHandler.events.clear();
+    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+      TaskEventType.T_ATTEMPT_SUCCEEDED));
+    verifyOutgoingEvents(eventHandler.events, DAGEventType.DAG_SCHEDULER_UPDATE,
+        VertexEventType.V_TASK_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED);
+
+    // The task should now have succeeded and sent kill to other attempt
+    assertTaskSucceededState();
+    verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
+        eq(firstAttempt.getID().getId()));
+    @SuppressWarnings("rawtypes")
+    Event event = eventHandler.events.get(eventHandler.events.size()-1);
+    assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, event.getType());
+    assertEquals(specAttempt.getID(),
+        ((TaskAttemptEventKillRequest) event).getTaskAttemptID());
+
+    eventHandler.events.clear();
+    // Emulate the spec attempt being killed
+    mockTask.handle(new TaskEventTAUpdate(specAttempt
+      .getID(), TaskEventType.T_ATTEMPT_KILLED));
+    assertTaskSucceededState();
+    verifyOutgoingEvents(eventHandler.events, DAGEventType.DAG_SCHEDULER_UPDATE,
+        VertexEventType.V_TASK_ATTEMPT_COMPLETED);
+  }
+
+  @Test(timeout = 20000)
   public void testSpeculatedThenRetroactiveFailure() {
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);