MAPREDUCE-1980. Fixes TaskAttemptUnsuccessfulCompletionEvent and TaskAttemptFinishedEvent to correctly log event type for all task types. Contributed by Amar Kamat
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@985798 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d00f929..bb2a098 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1750,3 +1750,8 @@
MAPREDUCE-2012. Some contrib tests fail in branch 0.21 and trunk.
(Amareshwari Sriramadasu via tomwhite)
+
+ MAPREDUCE-1980. Fixes TaskAttemptUnsuccessfulCompletionEvent and
+ TaskAttemptFinishedEvent to correctly log event type for all task types.
+ (Amar Kamat via amareshwari)
+
diff --git a/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
index cc3692d..fff9232 100644
--- a/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
+++ b/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
@@ -92,7 +92,11 @@
Counters getCounters() { return EventReader.fromAvro(datum.counters); }
/** Get the event type */
public EventType getEventType() {
- return EventType.MAP_ATTEMPT_FINISHED;
+ // Note that the task type can be setup/map/reduce/cleanup but the
+ // attempt-type can only be map/reduce.
+ return getTaskId().getTaskType() == TaskType.MAP
+ ? EventType.MAP_ATTEMPT_FINISHED
+ : EventType.REDUCE_ATTEMPT_FINISHED;
}
}
diff --git a/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
index 5960fe8..c23f792 100644
--- a/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
+++ b/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
@@ -87,7 +88,17 @@
public String getTaskStatus() { return datum.status.toString(); }
/** Get the event type */
public EventType getEventType() {
- return EventType.MAP_ATTEMPT_KILLED;
+ // Note that the task type can be setup/map/reduce/cleanup but the
+ // attempt-type can only be map/reduce.
+ // find out if the task failed or got killed
+ boolean failed = TaskStatus.State.FAILED.toString().equals(getTaskStatus());
+ return getTaskId().getTaskType() == TaskType.MAP
+ ? (failed
+ ? EventType.MAP_ATTEMPT_FAILED
+ : EventType.MAP_ATTEMPT_KILLED)
+ : (failed
+ ? EventType.REDUCE_ATTEMPT_FAILED
+ : EventType.REDUCE_ATTEMPT_KILLED);
}
}
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java b/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java
index 29f19c3..aaea969 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.mapreduce.jobhistory;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
@@ -27,38 +29,124 @@
*/
public class TestJobHistoryEvents extends TestCase {
/**
- * Test TaskAttemptStartedEvent.
+ * Test {@link TaskAttemptStartedEvent} for various task types.
+ */
+ private static void testAttemptStartedEventForTypes(EventType expected,
+ TaskAttemptID id,
+ TaskType[] types) {
+ for (TaskType t : types) {
+ TaskAttemptStartedEvent tase =
+ new TaskAttemptStartedEvent(id, t, 0L, "", 0);
+ assertEquals(expected, tase.getEventType());
+ }
+ }
+
+ /**
+ * Test {@link TaskAttemptStartedEvent}.
*/
public void testTaskAttemptStartedEvent() {
EventType expected = EventType.MAP_ATTEMPT_STARTED;
TaskAttemptID fakeId = new TaskAttemptID("1234", 1, TaskType.MAP, 1, 1);
- // check jobsetup type
- TaskAttemptStartedEvent tase =
- new TaskAttemptStartedEvent(fakeId, TaskType.JOB_SETUP, 0L, "", 0);
- assertEquals(expected, tase.getEventType());
-
- // check jobcleanup type
- tase = new TaskAttemptStartedEvent(fakeId, TaskType.JOB_CLEANUP, 0L, "", 0);
- assertEquals(expected, tase.getEventType());
-
- // check map type
- tase = new TaskAttemptStartedEvent(fakeId, TaskType.MAP, 0L, "", 0);
- assertEquals(expected, tase.getEventType());
+ // check the events for job-setup, job-cleanup and map task-types
+ testAttemptStartedEventForTypes(expected, fakeId,
+ new TaskType[] {TaskType.JOB_SETUP,
+ TaskType.JOB_CLEANUP,
+ TaskType.MAP});
expected = EventType.REDUCE_ATTEMPT_STARTED;
fakeId = new TaskAttemptID("1234", 1, TaskType.REDUCE, 1, 1);
- // check jobsetup type
- tase = new TaskAttemptStartedEvent(fakeId, TaskType.JOB_SETUP, 0L, "", 0);
- assertEquals(expected, tase.getEventType());
+ // check the events for job-setup, job-cleanup and reduce task-types
+ testAttemptStartedEventForTypes(expected, fakeId,
+ new TaskType[] {TaskType.JOB_SETUP,
+ TaskType.JOB_CLEANUP,
+ TaskType.REDUCE});
+ }
+
+ /**
+ * Test {@link TaskAttemptUnsuccessfulCompletionEvent} for various task types.
+ */
+ private static void testFailedKilledEventsForTypes(EventType expected,
+ TaskAttemptID id,
+ TaskType[] types,
+ String state) {
+ for (TaskType t : types) {
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ new TaskAttemptUnsuccessfulCompletionEvent(id, t, state, 0L, "", "");
+ assertEquals(expected, tauce.getEventType());
+ }
+ }
+
+ /**
+ * Test {@link TaskAttemptUnsuccessfulCompletionEvent} for killed/failed task.
+ */
+ public void testTaskAttemptUnsuccessfulCompletionEvent() {
+ TaskAttemptID fakeId = new TaskAttemptID("1234", 1, TaskType.MAP, 1, 1);
- // check jobcleanup type
- tase = new TaskAttemptStartedEvent(fakeId, TaskType.JOB_CLEANUP, 0L, "", 0);
- assertEquals(expected, tase.getEventType());
+ // check killed events for job-setup, job-cleanup and map task-types
+ testFailedKilledEventsForTypes(EventType.MAP_ATTEMPT_KILLED, fakeId,
+ new TaskType[] {TaskType.JOB_SETUP,
+ TaskType.JOB_CLEANUP,
+ TaskType.MAP},
+ TaskStatus.State.KILLED.toString());
+ // check failed events for job-setup, job-cleanup and map task-types
+ testFailedKilledEventsForTypes(EventType.MAP_ATTEMPT_FAILED, fakeId,
+ new TaskType[] {TaskType.JOB_SETUP,
+ TaskType.JOB_CLEANUP,
+ TaskType.MAP},
+ TaskStatus.State.FAILED.toString());
- // check reduce type
- tase = new TaskAttemptStartedEvent(fakeId, TaskType.REDUCE, 0L, "", 0);
- assertEquals(expected, tase.getEventType());
+ fakeId = new TaskAttemptID("1234", 1, TaskType.REDUCE, 1, 1);
+
+ // check killed events for job-setup, job-cleanup and reduce task-types
+ testFailedKilledEventsForTypes(EventType.REDUCE_ATTEMPT_KILLED, fakeId,
+ new TaskType[] {TaskType.JOB_SETUP,
+ TaskType.JOB_CLEANUP,
+ TaskType.REDUCE},
+ TaskStatus.State.KILLED.toString());
+ // check failed events for job-setup, job-cleanup and reduce task-types
+ testFailedKilledEventsForTypes(EventType.REDUCE_ATTEMPT_FAILED, fakeId,
+ new TaskType[] {TaskType.JOB_SETUP,
+ TaskType.JOB_CLEANUP,
+ TaskType.REDUCE},
+ TaskStatus.State.FAILED.toString());
+ }
+
+ /**
+ * Test {@link TaskAttemptFinishedEvent} for various task types.
+ */
+ private static void testFinishedEventsForTypes(EventType expected,
+ TaskAttemptID id,
+ TaskType[] types) {
+ for (TaskType t : types) {
+ TaskAttemptFinishedEvent tafe =
+ new TaskAttemptFinishedEvent(id, t,
+ TaskStatus.State.SUCCEEDED.toString(), 0L, "", "", new Counters());
+ assertEquals(expected, tafe.getEventType());
+ }
+ }
+
+ /**
+ * Test {@link TaskAttemptFinishedEvent} for finished task.
+ */
+ public void testTaskAttemptFinishedEvent() {
+ EventType expected = EventType.MAP_ATTEMPT_FINISHED;
+ TaskAttemptID fakeId = new TaskAttemptID("1234", 1, TaskType.MAP, 1, 1);
+
+ // check the events for job-setup, job-cleanup and map task-types
+ testFinishedEventsForTypes(expected, fakeId,
+ new TaskType[] {TaskType.JOB_SETUP,
+ TaskType.JOB_CLEANUP,
+ TaskType.MAP});
+
+ expected = EventType.REDUCE_ATTEMPT_FINISHED;
+ fakeId = new TaskAttemptID("1234", 1, TaskType.REDUCE, 1, 1);
+
+ // check the events for job-setup, job-cleanup and reduce task-types
+ testFinishedEventsForTypes(expected, fakeId,
+ new TaskType[] {TaskType.JOB_SETUP,
+ TaskType.JOB_CLEANUP,
+ TaskType.REDUCE});
}
}