TEZ-4108. NullPointerException during speculative execution race condition
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 2d0688f..39e2b4c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1029,7 +1029,9 @@
         // find the oldest running attempt
         if (!ta.isFinished()) {
           earliestUnfinishedAttempt = ta;
-          task.nodesWithRunningAttempts.add(ta.getNodeId());
+          if (ta.getNodeId() != null) {
+            task.nodesWithRunningAttempts.add(ta.getNodeId());
+          }
         } else {
           if (TaskAttemptState.SUCCEEDED.equals(ta.getState())) {
             LOG.info("Ignore speculation scheduling for task {} since it has succeeded with attempt {}.",
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index a3de936..1af6092 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -887,6 +887,45 @@
     assertEquals(2, mockTask.getAttemptList().size());
   }
 
+  @Test
+  public void testKilledBeforeSpeculatedSucceeded() {
+    conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+    Vertex vertex = mock(Vertex.class);
+    doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
+    mockTask = new MockTaskImpl(vertexId, partition,
+        eventHandler, conf, taskCommunicatorManagerInterface, clock,
+        taskHeartbeatHandler, appContext, leafVertex,
+        taskResource, containerContext, vertex);
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(firstAttempt.getID());
+    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+    mockTask.handle(createTaskTAKilledEvent(firstAttempt.getID()));
+    assertEquals(TaskStateInternal.RUNNING, mockTask.getInternalState());
+
+    // We need to manually override the current node id
+    // to induce NPE in the state machine transition
+    // simulating killed before speculated scenario
+    NodeId nodeId = mockNodeId;
+    mockNodeId = null;
+
+    // Add a speculative task attempt
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+    mockNodeId = nodeId;
+    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(specAttempt.getID());
+    updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+    assertEquals(3, mockTask.getAttemptList().size());
+
+    // Now succeed the speculative attempt
+    updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED);
+    mockTask.handle(createTaskTASucceededEvent(specAttempt.getID()));
+    assertEquals(TaskState.SUCCEEDED, mockTask.getState());
+    assertEquals(3, mockTask.getAttemptList().size());
+  }
+
   @Test(timeout = 20000)
   public void testKilledAttemptUpdatesDAGScheduler() {
     TezTaskID taskId = getNewTaskID();