blob: 827e727e5641c2987824ec89275e4034caded13a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import java.util.Arrays;
import java.util.Iterator;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.junit.Test;
public class TestFetchFailure {
@Test
public void testFetchFailure() throws Exception {
MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true);
Configuration conf = new Configuration();
// map -> reduce -> fetch-failure -> map retry is incompatible with
// sequential, single-task-attempt approach in uber-AM, so disable:
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
//all maps would be running
Assert.assertEquals("Num tasks not correct",
2, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask = it.next();
Task reduceTask = it.next();
//wait for Task state move to RUNNING
app.waitForState(mapTask, TaskState.RUNNING);
TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
//send the done signal to the map attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(mapAttempt1.getID(),
TaskAttemptEventType.TA_DONE));
// wait for map success
app.waitForState(mapTask, TaskState.SUCCEEDED);
TaskAttemptCompletionEvent[] events =
job.getTaskAttemptCompletionEvents(0, 100);
Assert.assertEquals("Num completion events not correct",
1, events.length);
Assert.assertEquals("Event status not correct",
TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
// wait for reduce to start running
app.waitForState(reduceTask, TaskState.RUNNING);
TaskAttempt reduceAttempt =
reduceTask.getAttempts().values().iterator().next();
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
//send 3 fetch failures from reduce to trigger map re execution
sendFetchFailure(app, reduceAttempt, mapAttempt1);
sendFetchFailure(app, reduceAttempt, mapAttempt1);
sendFetchFailure(app, reduceAttempt, mapAttempt1);
//wait for map Task state move back to RUNNING
app.waitForState(mapTask, TaskState.RUNNING);
//map attempt must have become FAILED
Assert.assertEquals("Map TaskAttempt state not correct",
TaskAttemptState.FAILED, mapAttempt1.getState());
Assert.assertEquals("Num attempts in Map Task not correct",
2, mapTask.getAttempts().size());
Iterator<TaskAttempt> atIt = mapTask.getAttempts().values().iterator();
atIt.next();
TaskAttempt mapAttempt2 = atIt.next();
app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
//send the done signal to the second map attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(mapAttempt2.getID(),
TaskAttemptEventType.TA_DONE));
// wait for map success
app.waitForState(mapTask, TaskState.SUCCEEDED);
//send done to reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(reduceAttempt.getID(),
TaskAttemptEventType.TA_DONE));
app.waitForState(job, JobState.SUCCEEDED);
//previous completion event now becomes obsolete
Assert.assertEquals("Event status not correct",
TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
events = job.getTaskAttemptCompletionEvents(0, 100);
Assert.assertEquals("Num completion events not correct",
4, events.length);
Assert.assertEquals("Event map attempt id not correct",
mapAttempt1.getID(), events[0].getAttemptId());
Assert.assertEquals("Event map attempt id not correct",
mapAttempt1.getID(), events[1].getAttemptId());
Assert.assertEquals("Event map attempt id not correct",
mapAttempt2.getID(), events[2].getAttemptId());
Assert.assertEquals("Event redude attempt id not correct",
reduceAttempt.getID(), events[3].getAttemptId());
Assert.assertEquals("Event status not correct for map attempt1",
TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
Assert.assertEquals("Event status not correct for map attempt1",
TaskAttemptCompletionEventStatus.FAILED, events[1].getStatus());
Assert.assertEquals("Event status not correct for map attempt2",
TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
Assert.assertEquals("Event status not correct for reduce attempt1",
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
}
private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
TaskAttempt mapAttempt) {
app.getContext().getEventHandler().handle(
new JobTaskAttemptFetchFailureEvent(
reduceAttempt.getID(),
Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()})));
}
}