MAPREDUCE-1980. Fixes TaskAttemptUnsuccessfulCompletionEvent and TaskAttemptFinishedEvent to correctly log event type for all task types. Contributed by Amar Kamat

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@985798 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d00f929..bb2a098 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1750,3 +1750,8 @@
 
     MAPREDUCE-2012. Some contrib tests fail in branch 0.21 and trunk.
     (Amareshwari Sriramadasu via tomwhite)
+
+    MAPREDUCE-1980. Fixes TaskAttemptUnsuccessfulCompletionEvent and
+    TaskAttemptFinishedEvent to correctly log event type for all task types.
+    (Amar Kamat via amareshwari)
+
diff --git a/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
index cc3692d..fff9232 100644
--- a/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
+++ b/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
@@ -92,7 +92,11 @@
   Counters getCounters() { return EventReader.fromAvro(datum.counters); }
   /** Get the event type */
   public EventType getEventType() {
-    return EventType.MAP_ATTEMPT_FINISHED;
+    // Note that the task type can be setup/map/reduce/cleanup but the 
+    // attempt-type can only be map/reduce.
+    return getTaskId().getTaskType() == TaskType.MAP 
+           ? EventType.MAP_ATTEMPT_FINISHED
+           : EventType.REDUCE_ATTEMPT_FINISHED;
   }
 
 }
diff --git a/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
index 5960fe8..c23f792 100644
--- a/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
+++ b/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
@@ -22,6 +22,7 @@
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -87,7 +88,17 @@
   public String getTaskStatus() { return datum.status.toString(); }
   /** Get the event type */
   public EventType getEventType() {
-    return EventType.MAP_ATTEMPT_KILLED;
+    // Note that the task type can be setup/map/reduce/cleanup but the 
+    // attempt-type can only be map/reduce.
+    // find out if the task failed or got killed
+    boolean failed = TaskStatus.State.FAILED.toString().equals(getTaskStatus());
+    return getTaskId().getTaskType() == TaskType.MAP 
+           ? (failed 
+              ? EventType.MAP_ATTEMPT_FAILED
+              : EventType.MAP_ATTEMPT_KILLED)
+           : (failed
+              ? EventType.REDUCE_ATTEMPT_FAILED
+              : EventType.REDUCE_ATTEMPT_KILLED);
   }
 
 }
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java b/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java
index 29f19c3..aaea969 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 
@@ -27,38 +29,124 @@
  */
 public class TestJobHistoryEvents extends TestCase {
   /**
-   * Test TaskAttemptStartedEvent.
+   * Test {@link TaskAttemptStartedEvent} for various task types.
+   */
+  private static void testAttemptStartedEventForTypes(EventType expected, 
+                                                      TaskAttemptID id,
+                                                      TaskType[] types) {
+    for (TaskType t : types) {
+      TaskAttemptStartedEvent tase = 
+        new TaskAttemptStartedEvent(id, t, 0L, "", 0);
+      assertEquals(expected, tase.getEventType());
+    }
+  }
+  
+  /**
+   * Test {@link TaskAttemptStartedEvent}.
    */
   public void testTaskAttemptStartedEvent() {
     EventType expected = EventType.MAP_ATTEMPT_STARTED;
     TaskAttemptID fakeId = new TaskAttemptID("1234", 1, TaskType.MAP, 1, 1);
     
-    // check jobsetup type
-    TaskAttemptStartedEvent tase = 
-      new TaskAttemptStartedEvent(fakeId, TaskType.JOB_SETUP, 0L, "", 0);
-    assertEquals(expected, tase.getEventType());
-    
-    // check jobcleanup type
-    tase = new TaskAttemptStartedEvent(fakeId, TaskType.JOB_CLEANUP, 0L, "", 0);
-    assertEquals(expected, tase.getEventType());
-    
-    // check map type
-    tase = new TaskAttemptStartedEvent(fakeId, TaskType.MAP, 0L, "", 0);
-    assertEquals(expected, tase.getEventType());
+    // check the events for job-setup, job-cleanup and map task-types
+    testAttemptStartedEventForTypes(expected, fakeId,
+                                    new TaskType[] {TaskType.JOB_SETUP, 
+                                                    TaskType.JOB_CLEANUP, 
+                                                    TaskType.MAP});
     
     expected = EventType.REDUCE_ATTEMPT_STARTED;
     fakeId = new TaskAttemptID("1234", 1, TaskType.REDUCE, 1, 1);
     
-    // check jobsetup type
-    tase = new TaskAttemptStartedEvent(fakeId, TaskType.JOB_SETUP, 0L, "", 0);
-    assertEquals(expected, tase.getEventType());
+    // check the events for job-setup, job-cleanup and reduce task-types
+    testAttemptStartedEventForTypes(expected, fakeId,
+                                    new TaskType[] {TaskType.JOB_SETUP, 
+                                                    TaskType.JOB_CLEANUP, 
+                                                    TaskType.REDUCE});
+  }
+  
+  /**
+   * Test {@link TaskAttemptUnsuccessfulCompletionEvent} for various task types.
+   */
+  private static void testFailedKilledEventsForTypes(EventType expected, 
+                                                     TaskAttemptID id,
+                                                     TaskType[] types,
+                                                     String state) {
+    for (TaskType t : types) {
+      TaskAttemptUnsuccessfulCompletionEvent tauce = 
+        new TaskAttemptUnsuccessfulCompletionEvent(id, t, state, 0L, "", "");
+      assertEquals(expected, tauce.getEventType());
+    }
+  }
+  
+  /**
+   * Test {@link TaskAttemptUnsuccessfulCompletionEvent} for killed/failed task.
+   */
+  public void testTaskAttemptUnsuccessfulCompletionEvent() {
+    TaskAttemptID fakeId = new TaskAttemptID("1234", 1, TaskType.MAP, 1, 1);
     
-    // check jobcleanup type
-    tase = new TaskAttemptStartedEvent(fakeId, TaskType.JOB_CLEANUP, 0L, "", 0);
-    assertEquals(expected, tase.getEventType());
+    // check killed events for job-setup, job-cleanup and map task-types
+    testFailedKilledEventsForTypes(EventType.MAP_ATTEMPT_KILLED, fakeId,
+                                   new TaskType[] {TaskType.JOB_SETUP, 
+                                                   TaskType.JOB_CLEANUP, 
+                                                   TaskType.MAP},
+                                   TaskStatus.State.KILLED.toString());
+    // check failed events for job-setup, job-cleanup and map task-types
+    testFailedKilledEventsForTypes(EventType.MAP_ATTEMPT_FAILED, fakeId,
+                                   new TaskType[] {TaskType.JOB_SETUP, 
+                                                   TaskType.JOB_CLEANUP, 
+                                                   TaskType.MAP},
+                                   TaskStatus.State.FAILED.toString());
     
-    // check reduce type
-    tase = new TaskAttemptStartedEvent(fakeId, TaskType.REDUCE, 0L, "", 0);
-    assertEquals(expected, tase.getEventType());
+    fakeId = new TaskAttemptID("1234", 1, TaskType.REDUCE, 1, 1);
+    
+    // check killed events for job-setup, job-cleanup and reduce task-types
+    testFailedKilledEventsForTypes(EventType.REDUCE_ATTEMPT_KILLED, fakeId,
+                                   new TaskType[] {TaskType.JOB_SETUP, 
+                                                   TaskType.JOB_CLEANUP, 
+                                                   TaskType.REDUCE},
+                                   TaskStatus.State.KILLED.toString());
+    // check failed events for job-setup, job-cleanup and reduce task-types
+    testFailedKilledEventsForTypes(EventType.REDUCE_ATTEMPT_FAILED, fakeId,
+                                   new TaskType[] {TaskType.JOB_SETUP, 
+                                                   TaskType.JOB_CLEANUP, 
+                                                   TaskType.REDUCE},
+                                   TaskStatus.State.FAILED.toString());
+  }
+  
+  /**
+   * Test {@link TaskAttemptFinishedEvent} for various task types.
+   */
+  private static void testFinishedEventsForTypes(EventType expected, 
+                                                 TaskAttemptID id,
+                                                 TaskType[] types) {
+    for (TaskType t : types) {
+      TaskAttemptFinishedEvent tafe = 
+        new TaskAttemptFinishedEvent(id, t, 
+            TaskStatus.State.SUCCEEDED.toString(), 0L, "", "", new Counters());
+      assertEquals(expected, tafe.getEventType());
+    }
+  }
+  
+  /**
+   * Test {@link TaskAttemptFinishedEvent} for finished task.
+   */
+  public void testTaskAttemptFinishedEvent() {
+    EventType expected = EventType.MAP_ATTEMPT_FINISHED;
+    TaskAttemptID fakeId = new TaskAttemptID("1234", 1, TaskType.MAP, 1, 1);
+    
+    // check the events for job-setup, job-cleanup and map task-types
+    testFinishedEventsForTypes(expected, fakeId,
+                               new TaskType[] {TaskType.JOB_SETUP, 
+                                               TaskType.JOB_CLEANUP, 
+                                               TaskType.MAP});
+    
+    expected = EventType.REDUCE_ATTEMPT_FINISHED;
+    fakeId = new TaskAttemptID("1234", 1, TaskType.REDUCE, 1, 1);
+    
+    // check the events for job-setup, job-cleanup and reduce task-types
+    testFinishedEventsForTypes(expected, fakeId,
+                               new TaskType[] {TaskType.JOB_SETUP, 
+                                               TaskType.JOB_CLEANUP, 
+                                               TaskType.REDUCE});
   }
 }