blob: 4744035f5e269f92459dbc4d5b7b4be47ffd3335 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import java.util.Iterator;
import java.util.Map;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.junit.Test;
/**
* Tests the state machine with respect to Job/Task/TaskAttempt failure
* scenarios.
*/
public class TestFail {
@Test
//First attempt is failed and second attempt is passed
//The job succeeds.
public void testFailTask() throws Exception {
MRApp app = new MockFirstFailingAttemptMRApp(1, 0);
Configuration conf = new Configuration();
// this test requires two task attempts, but uberization overrides max to 1
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
Job job = app.submit(conf);
app.waitForState(job, JobState.SUCCEEDED);
Map<TaskId,Task> tasks = job.getTasks();
Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
Task task = tasks.values().iterator().next();
Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
task.getReport().getTaskState());
Map<TaskAttemptId, TaskAttempt> attempts =
tasks.values().iterator().next().getAttempts();
Assert.assertEquals("Num attempts is not correct", 2, attempts.size());
//one attempt must be failed
//and another must have succeeded
Iterator<TaskAttempt> it = attempts.values().iterator();
Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
it.next().getReport().getTaskAttemptState());
Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
it.next().getReport().getTaskAttemptState());
}
@Test
public void testMapFailureMaxPercent() throws Exception {
MRApp app = new MockFirstFailingTaskMRApp(4, 0);
Configuration conf = new Configuration();
//reduce the no of attempts so test run faster
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 20);
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
Job job = app.submit(conf);
app.waitForState(job, JobState.FAILED);
//setting the failure percentage to 25% (1/4 is 25) will
//make the Job successful
app = new MockFirstFailingTaskMRApp(4, 0);
conf = new Configuration();
//reduce the no of attempts so test run faster
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 25);
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
job = app.submit(conf);
app.waitForState(job, JobState.SUCCEEDED);
}
@Test
public void testReduceFailureMaxPercent() throws Exception {
MRApp app = new MockFirstFailingTaskMRApp(2, 4);
Configuration conf = new Configuration();
//reduce the no of attempts so test run faster
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);
conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);//no failure due to Map
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 20);
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
Job job = app.submit(conf);
app.waitForState(job, JobState.FAILED);
//setting the failure percentage to 25% (1/4 is 25) will
//make the Job successful
app = new MockFirstFailingTaskMRApp(2, 4);
conf = new Configuration();
//reduce the no of attempts so test run faster
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);
conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);//no failure due to Map
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 25);
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
job = app.submit(conf);
app.waitForState(job, JobState.SUCCEEDED);
}
@Test
//All Task attempts are timed out, leading to Job failure
public void testTimedOutTask() throws Exception {
MRApp app = new TimeOutTaskMRApp(1, 0);
Configuration conf = new Configuration();
int maxAttempts = 2;
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
// disable uberization (requires entire job to be reattempted, so max for
// subtask attempts is overridden to 1)
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
Job job = app.submit(conf);
app.waitForState(job, JobState.FAILED);
Map<TaskId,Task> tasks = job.getTasks();
Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
Task task = tasks.values().iterator().next();
Assert.assertEquals("Task state not correct", TaskState.FAILED,
task.getReport().getTaskState());
Map<TaskAttemptId, TaskAttempt> attempts =
tasks.values().iterator().next().getAttempts();
Assert.assertEquals("Num attempts is not correct", maxAttempts,
attempts.size());
for (TaskAttempt attempt : attempts.values()) {
Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
attempt.getReport().getTaskAttemptState());
}
}
static class TimeOutTaskMRApp extends MRApp {
TimeOutTaskMRApp(int maps, int reduces) {
super(maps, reduces, false, "TimeOutTaskMRApp", true);
}
@Override
protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
//This will create the TaskAttemptListener with TaskHeartbeatHandler
//RPC servers are not started
//task time out is reduced
//when attempt times out, heartbeat handler will send the lost event
//leading to Attempt failure
return new TaskAttemptListenerImpl(getContext(), null) {
public void startRpcServer(){};
public void stopRpcServer(){};
public void init(Configuration conf) {
conf.setInt("mapreduce.task.timeout", 1*1000);//reduce timeout
super.init(conf);
}
};
}
}
//Attempts of first Task are failed
static class MockFirstFailingTaskMRApp extends MRApp {
MockFirstFailingTaskMRApp(int maps, int reduces) {
super(maps, reduces, true, "MockFirstFailingTaskMRApp", true);
}
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0) {//check if it is first task
// send the Fail event
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_FAILMSG));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_DONE));
}
}
}
//First attempt is failed
static class MockFirstFailingAttemptMRApp extends MRApp {
MockFirstFailingAttemptMRApp(int maps, int reduces) {
super(maps, reduces, true, "MockFirstFailingAttemptMRApp", true);
}
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
//check if it is first task's first attempt
// send the Fail event
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_FAILMSG));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_DONE));
}
}
}
public static void main(String[] args) throws Exception {
TestFail t = new TestFail();
t.testFailTask();
t.testTimedOutTask();
t.testMapFailureMaxPercent();
t.testReduceFailureMaxPercent();
}
}