blob: 51959b0fbef79f043851d850babe81439e4372bf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.base.Supplier;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.Event;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.MapTaskImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRecovery {
private static final Log LOG = LogFactory.getLog(TestRecovery.class);
private static Path outputDir = new Path(new File("target",
TestRecovery.class.getName()).getAbsolutePath() +
Path.SEPARATOR + "out");
private static String partFile = "part-r-00000";
private Text key1 = new Text("key1");
private Text key2 = new Text("key2");
private Text val1 = new Text("val1");
private Text val2 = new Text("val2");
/**
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
* completely disappears because of failed launch, one attempt gets killed and
* one attempt succeeds. AM crashes after the first tasks finishes and
* recovers completely and succeeds in the second generation.
*
* @throws Exception
*/
@Test
public void testCrashed() throws Exception {
int runCount = 0;
long am1StartTimeEst = System.currentTimeMillis();
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
long jobStartTime = job.getReport().getStartTime();
//all maps would be running
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
Task reduceTask = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
app.waitForState(reduceTask, TaskState.RUNNING);
/////////// Play some games with the TaskAttempts of the first task //////
//send the fail signal to the 1st map task attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
TaskAttemptEventType.TA_FAILMSG));
app.waitForState(task1Attempt1, TaskAttemptState.FAILED);
int timeOut = 0;
while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
Thread.sleep(2000);
LOG.info("Waiting for next attempt to start");
}
Assert.assertEquals(2, mapTask1.getAttempts().size());
Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
itr.next();
TaskAttempt task1Attempt2 = itr.next();
// wait for the second task attempt to be assigned.
waitForContainerAssignment(task1Attempt2);
// This attempt will automatically fail because of the way ContainerLauncher
// is setup
// This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(task1Attempt2.getID(),
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
app.waitForState(task1Attempt2, TaskAttemptState.FAILED);
timeOut = 0;
while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) {
Thread.sleep(2000);
LOG.info("Waiting for next attempt to start");
}
Assert.assertEquals(3, mapTask1.getAttempts().size());
itr = mapTask1.getAttempts().values().iterator();
itr.next();
itr.next();
TaskAttempt task1Attempt3 = itr.next();
app.waitForState(task1Attempt3, TaskAttemptState.RUNNING);
//send the kill signal to the 1st map 3rd attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt3.getID(),
TaskAttemptEventType.TA_KILL));
app.waitForState(task1Attempt3, TaskAttemptState.KILLED);
timeOut = 0;
while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) {
Thread.sleep(2000);
LOG.info("Waiting for next attempt to start");
}
Assert.assertEquals(4, mapTask1.getAttempts().size());
itr = mapTask1.getAttempts().values().iterator();
itr.next();
itr.next();
itr.next();
TaskAttempt task1Attempt4 = itr.next();
app.waitForState(task1Attempt4, TaskAttemptState.RUNNING);
//send the done signal to the 1st map 4th attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt4.getID(),
TaskAttemptEventType.TA_DONE));
/////////// End of games with the TaskAttempts of the first task //////
//wait for first map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
long task1StartTime = mapTask1.getReport().getStartTime();
long task1FinishTime = mapTask1.getReport().getFinishTime();
//stop the app
app.stop();
//rerun
//in rerun the 1st map will be recovered from previous run
long am2StartTimeEst = System.currentTimeMillis();
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
//all maps would be running
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
reduceTask = it.next();
// first map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.RUNNING);
task2Attempt = mapTask2.getAttempts().values().iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
//send the done signal to the 2nd map task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapTask2.getAttempts().values().iterator().next().getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(mapTask2, TaskState.SUCCEEDED);
//wait for reduce to be running before sending done
app.waitForState(reduceTask, TaskState.RUNNING);
//send the done signal to the reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduceTask.getAttempts().values().iterator().next().getID(),
TaskAttemptEventType.TA_DONE));
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
Assert.assertEquals("Job Start time not correct",
jobStartTime, job.getReport().getStartTime());
Assert.assertEquals("Task Start time not correct",
task1StartTime, mapTask1.getReport().getStartTime());
Assert.assertEquals("Task Finish time not correct",
task1FinishTime, mapTask1.getReport().getFinishTime());
Assert.assertEquals(2, job.getAMInfos().size());
int attemptNum = 1;
// Verify AMInfo
for (AMInfo amInfo : job.getAMInfos()) {
Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId()
.getAttemptId());
Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
.getApplicationAttemptId());
Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
}
long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();
Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst
&& am1StartTimeReal <= am2StartTimeEst);
Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst
&& am2StartTimeReal <= System.currentTimeMillis());
// TODO Add verification of additional data from jobHistory - whatever was
// available in the failed attempt should be available here
}
/**
* Wait for a task attempt to be assigned a container to.
* @param task1Attempt2 the task attempt to wait for its container assignment
* @throws TimeoutException if times out
* @throws InterruptedException if interrupted
*/
public static void waitForContainerAssignment(final TaskAttempt task1Attempt2)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return task1Attempt2.getAssignedContainerID() != null;
}
}, 10, 10000);
}
/**
* AM with 3 maps and 0 reduce. AM crashes after the first two tasks finishes
* and recovers completely and succeeds in the second generation.
*
* @throws Exception
*/
@Test
public void testCrashOfMapsOnlyJob() throws Exception {
int runCount = 0;
MRApp app =
new MRAppWithHistory(3, 0, false, this.getClass().getName(), true,
++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
// all maps would be running
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
Task mapTask3 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
app.waitForState(mapTask3, TaskState.RUNNING);
TaskAttempt task1Attempt =
mapTask1.getAttempts().values().iterator().next();
TaskAttempt task2Attempt =
mapTask2.getAttempts().values().iterator().next();
TaskAttempt task3Attempt =
mapTask3.getAttempts().values().iterator().next();
// before sending the TA_DONE, event make sure attempt has come to
// RUNNING state
app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
// send the done signal to the 1st two maps
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE));
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE));
// wait for first two map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.SUCCEEDED);
// stop the app
app.stop();
// rerun
// in rerun the 1st two map will be recovered from previous run
app =
new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
// Set num-reduces explicitly in conf as recovery logic depends on it.
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
mapTask3 = it.next();
// first two maps will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.SUCCEEDED);
app.waitForState(mapTask3, TaskState.RUNNING);
task3Attempt = mapTask3.getAttempts().values().iterator().next();
// before sending the TA_DONE, event make sure attempt has come to
// RUNNING state
app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
// send the done signal to the 3rd map task
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next()
.getID(), TaskAttemptEventType.TA_DONE));
// wait to get it completed
app.waitForState(mapTask3, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
}
/**
* The class provides a custom implementation of output committer setupTask
* and isRecoverySupported methods, which determines if recovery supported
* based on config property.
*/
public static class TestFileOutputCommitter extends
org.apache.hadoop.mapred.FileOutputCommitter {
private boolean abortJobCalled;
@Override
public boolean isRecoverySupported(
org.apache.hadoop.mapred.JobContext jobContext) {
boolean isRecoverySupported = false;
if (jobContext != null && jobContext.getConfiguration() != null) {
isRecoverySupported = jobContext.getConfiguration().getBoolean(
"want.am.recovery", false);
}
return isRecoverySupported;
}
@Override
public void abortJob(JobContext context, int runState) throws IOException {
super.abortJob(context, runState);
this.abortJobCalled = true;
}
private boolean isAbortJobCalled() {
return this.abortJobCalled;
}
}
/**
* This test case primarily verifies if the recovery is controlled through config
* property. In this case, recover is turned ON. AM with 3 maps and 0 reduce.
* AM crashes after the first two tasks finishes and recovers completely and
* succeeds in the second generation.
*
* @throws Exception
*/
@Test
public void testRecoverySuccessUsingCustomOutputCommitter() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(3, 0, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setClass("mapred.output.committer.class",
TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean("want.am.recovery", true);
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
// all maps would be running
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
Task mapTask3 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
app.waitForState(mapTask3, TaskState.RUNNING);
TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator()
.next();
TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator()
.next();
TaskAttempt task3Attempt = mapTask3.getAttempts().values().iterator()
.next();
// before sending the TA_DONE, event make sure attempt has come to
// RUNNING state
app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
// send the done signal to the 1st two maps
app.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(task1Attempt.getID(),
TaskAttemptEventType.TA_DONE));
app.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(task2Attempt.getID(),
TaskAttemptEventType.TA_DONE));
// wait for first two map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.SUCCEEDED);
// stop the app
app.stop();
// rerun
// in rerun the 1st two map will be recovered from previous run
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setClass("mapred.output.committer.class",
TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.setBoolean("want.am.recovery", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
// Set num-reduces explicitly in conf as recovery logic depends on it.
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
mapTask3 = it.next();
// first two maps will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.SUCCEEDED);
app.waitForState(mapTask3, TaskState.RUNNING);
task3Attempt = mapTask3.getAttempts().values().iterator().next();
// before sending the TA_DONE, event make sure attempt has come to
// RUNNING state
app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
// send the done signal to the 3rd map task
app.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(mapTask3.getAttempts().values().iterator()
.next().getID(), TaskAttemptEventType.TA_DONE));
// wait to get it completed
app.waitForState(mapTask3, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
}
@Test
public void testRecoveryWithSpillEncryption() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(),
true, ++runCount) {
};
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
// run the MR job at the first attempt
Job jobAttempt1 = app.submit(conf);
app.waitForState(jobAttempt1, JobState.RUNNING);
Iterator<Task> tasks = jobAttempt1.getTasks().values().iterator();
// finish the map task but the reduce task
Task mapper = tasks.next();
app.waitForState(mapper, TaskState.RUNNING);
TaskAttempt mapAttempt = mapper.getAttempts().values().iterator().next();
app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE));
app.waitForState(mapper, TaskState.SUCCEEDED);
// crash the first attempt of the MR job
app.stop();
// run the MR job again at the second attempt
app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), false,
++runCount);
Job jobAttempt2 = app.submit(conf);
Assert.assertTrue("Recovery from previous job attempt is processed even " +
"though intermediate data encryption is enabled.", !app.recovered());
// The map task succeeded from previous job attempt will not be recovered
// because the data spill encryption is enabled.
// Let's finish the job at the second attempt and verify its completion.
app.waitForState(jobAttempt2, JobState.RUNNING);
tasks = jobAttempt2.getTasks().values().iterator();
mapper = tasks.next();
Task reducer = tasks.next();
// finish the map task first
app.waitForState(mapper, TaskState.RUNNING);
mapAttempt = mapper.getAttempts().values().iterator().next();
app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE));
app.waitForState(mapper, TaskState.SUCCEEDED);
// then finish the reduce task
TaskAttempt redAttempt = reducer.getAttempts().values().iterator().next();
app.waitForState(redAttempt, TaskAttemptState.RUNNING);
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(redAttempt.getID(), TaskAttemptEventType.TA_DONE));
app.waitForState(reducer, TaskState.SUCCEEDED);
// verify that the job succeeds at the 2rd attempt
app.waitForState(jobAttempt2, JobState.SUCCEEDED);
}
/**
* This test case primarily verifies if the recovery is controlled through config
* property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce.
* AM crashes after the first two tasks finishes and recovery fails and have
* to rerun fully in the second generation and succeeds.
*
* @throws Exception
*/
@Test
public void testRecoveryFailsUsingCustomOutputCommitter() throws Exception {
int runCount = 0;
MRApp app =
new MRAppWithHistory(3, 0, false, this.getClass().getName(), true,
++runCount);
Configuration conf = new Configuration();
conf.setClass("mapred.output.committer.class", TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean("want.am.recovery", false);
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
// all maps would be running
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
Task mapTask3 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
app.waitForState(mapTask3, TaskState.RUNNING);
TaskAttempt task1Attempt =
mapTask1.getAttempts().values().iterator().next();
TaskAttempt task2Attempt =
mapTask2.getAttempts().values().iterator().next();
TaskAttempt task3Attempt =
mapTask3.getAttempts().values().iterator().next();
// before sending the TA_DONE, event make sure attempt has come to
// RUNNING state
app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
// send the done signal to the 1st two maps
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE));
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE));
// wait for first two map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.SUCCEEDED);
// stop the app
app.stop();
// rerun
// in rerun the 1st two map will be recovered from previous run
app =
new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setClass("mapred.output.committer.class", TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.setBoolean("want.am.recovery", false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
// Set num-reduces explicitly in conf as recovery logic depends on it.
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
mapTask3 = it.next();
// first two maps will NOT be recovered, need to send done from them
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
app.waitForState(mapTask3, TaskState.RUNNING);
task3Attempt = mapTask3.getAttempts().values().iterator().next();
// before sending the TA_DONE, event make sure attempt has come to
// RUNNING state
app.waitForState(task3Attempt, TaskAttemptState.RUNNING);
// send the done signal to all 3 tasks map task
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(mapTask1.getAttempts().values().iterator().next()
.getID(), TaskAttemptEventType.TA_DONE));
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(mapTask2.getAttempts().values().iterator().next()
.getID(), TaskAttemptEventType.TA_DONE));
app
.getContext()
.getEventHandler()
.handle(
new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next()
.getID(), TaskAttemptEventType.TA_DONE));
// wait to get it completed
app.waitForState(mapTask3, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
}
@Test
public void testMultipleCrashes() throws Exception {
int runCount = 0;
MRApp app =
new MRAppWithHistory(2, 1, false, this.getClass().getName(), true,
++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
//all maps would be running
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
Task reduceTask = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
// reduces must be in NEW state
Assert.assertEquals("Reduce Task state not correct",
TaskState.RUNNING, reduceTask.getReport().getTaskState());
//send the done signal to the 1st map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for first map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Crash the app
app.stop();
//rerun
//in rerun the 1st map will be recovered from previous run
app =
new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
//all maps would be running
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
reduceTask = it.next();
// first map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.RUNNING);
task2Attempt = mapTask2.getAttempts().values().iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
//send the done signal to the 2nd map task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapTask2.getAttempts().values().iterator().next().getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(mapTask2, TaskState.SUCCEEDED);
// Crash the app again.
app.stop();
//rerun
//in rerun the 1st and 2nd map will be recovered from previous run
app =
new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
//all maps would be running
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
reduceTask = it.next();
// The maps will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.SUCCEEDED);
//wait for reduce to be running before sending done
app.waitForState(reduceTask, TaskState.RUNNING);
//send the done signal to the reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduceTask.getAttempts().values().iterator().next().getID(),
TaskAttemptEventType.TA_DONE));
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
}
@Test
public void testOutputRecovery() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task reduceTask1 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
.next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
//send the done signal to the map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
app.waitForState(reduceTask1, TaskState.RUNNING);
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
// write output corresponding to reduce1
writeOutput(reduce1Attempt1, conf);
//send the done signal to the 1st reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for first reduce task to complete
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
//stop the app before the job completes.
app.stop();
//rerun
//in rerun the map will be recovered from previous run
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
reduceTask1 = it.next();
Task reduceTask2 = it.next();
// map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port after recovery
task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
// first reduce will be recovered, no need to send done
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
app.waitForState(reduceTask2, TaskState.RUNNING);
TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
.iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
//send the done signal to the 2nd reduce task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce2Attempt.getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(reduceTask2, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
validateOutput();
}
@Test
public void testPreviousJobOutputCleanedWhenNoRecovery() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
conf.setClass("mapred.output.committer.class",
TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
//stop the app before the job completes.
app.stop();
app.close();
//rerun
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
++runCount);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
TestFileOutputCommitter committer = (
TestFileOutputCommitter) app.getCommitter();
assertTrue("commiter.abortJob() has not been called",
committer.isAbortJobCalled());
app.close();
}
@Test
public void testPreviousJobIsNotCleanedWhenRecovery()
throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setClass("mapred.output.committer.class",
TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
// TestFileOutputCommitter supports recovery if want.am.recovery=true
conf.setBoolean("want.am.recovery", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
//stop the app before the job completes.
app.stop();
app.close();
//rerun
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
++runCount);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
TestFileOutputCommitter committer = (
TestFileOutputCommitter) app.getCommitter();
assertFalse("commiter.abortJob() has been called",
committer.isAbortJobCalled());
app.close();
}
@Test
public void testOutputRecoveryMapsOnly() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
Task reduceTask1 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
.next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
// write output corresponding to map1 (This is just to validate that it is
//no included in the output)
writeBadOutput(task1Attempt1, conf);
//send the done signal to the map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
//stop the app before the job completes.
app.stop();
//rerun
//in rerun the map will be recovered from previous run
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
reduceTask1 = it.next();
// map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port after recovery
task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
app.waitForState(mapTask2, TaskState.RUNNING);
TaskAttempt task2Attempt1 = mapTask2.getAttempts().values().iterator()
.next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task2Attempt1, TaskAttemptState.RUNNING);
//send the done signal to the map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task2Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for map task to complete
app.waitForState(mapTask2, TaskState.SUCCEEDED);
// Verify the shuffle-port
Assert.assertEquals(5467, task2Attempt1.getShufflePort());
app.waitForState(reduceTask1, TaskState.RUNNING);
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
// write output corresponding to reduce1
writeOutput(reduce1Attempt1, conf);
//send the done signal to the 1st reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for first reduce task to complete
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
validateOutput();
}
@Test
public void testRecoveryWithOldCommiter() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", false);
conf.setBoolean("mapred.reducer.new-api", false);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task reduceTask1 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
.next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
//send the done signal to the map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
app.waitForState(reduceTask1, TaskState.RUNNING);
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
// write output corresponding to reduce1
writeOutput(reduce1Attempt1, conf);
//send the done signal to the 1st reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for first reduce task to complete
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
//stop the app before the job completes.
app.stop();
//rerun
//in rerun the map will be recovered from previous run
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", false);
conf.setBoolean("mapred.reducer.new-api", false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
reduceTask1 = it.next();
Task reduceTask2 = it.next();
// map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port after recovery
task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
// first reduce will be recovered, no need to send done
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
app.waitForState(reduceTask2, TaskState.RUNNING);
TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
.iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
//send the done signal to the 2nd reduce task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce2Attempt.getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(reduceTask2, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
validateOutput();
}
/**
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
* completely disappears because of failed launch, one attempt gets killed and
* one attempt succeeds. AM crashes after the first tasks finishes and
* recovers completely and succeeds in the second generation.
*
* @throws Exception
*/
@Test
public void testSpeculative() throws Exception {
int runCount = 0;
long am1StartTimeEst = System.currentTimeMillis();
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
long jobStartTime = job.getReport().getStartTime();
//all maps would be running
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
Task reduceTask = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
// Launch a Speculative Task for the first Task
app.getContext().getEventHandler().handle(
new TaskEvent(mapTask1.getID(), TaskEventType.T_ADD_SPEC_ATTEMPT));
int timeOut = 0;
while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
Thread.sleep(1000);
LOG.info("Waiting for next attempt to start");
}
Iterator<TaskAttempt> t1it = mapTask1.getAttempts().values().iterator();
TaskAttempt task1Attempt1 = t1it.next();
TaskAttempt task1Attempt2 = t1it.next();
TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
// wait for the second task attempt to be assigned.
waitForContainerAssignment(task1Attempt2);
ContainerId t1a2contId = task1Attempt2.getAssignedContainerID();
LOG.info(t1a2contId.toString());
LOG.info(task1Attempt1.getID().toString());
LOG.info(task1Attempt2.getID().toString());
// Launch container for speculative attempt
app.getContext().getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(task1Attempt2.getID(), runCount));
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
app.waitForState(task1Attempt2, TaskAttemptState.RUNNING);
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
app.waitForState(reduceTask, TaskState.RUNNING);
//send the done signal to the map 1 attempt 1
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
app.waitForState(task1Attempt1, TaskAttemptState.SUCCEEDED);
//wait for first map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
long task1StartTime = mapTask1.getReport().getStartTime();
long task1FinishTime = mapTask1.getReport().getFinishTime();
//stop the app
app.stop();
//rerun
//in rerun the 1st map will be recovered from previous run
long am2StartTimeEst = System.currentTimeMillis();
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
//all maps would be running
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
reduceTask = it.next();
// first map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
app.waitForState(mapTask2, TaskState.RUNNING);
task2Attempt = mapTask2.getAttempts().values().iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
//send the done signal to the 2nd map task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapTask2.getAttempts().values().iterator().next().getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(mapTask2, TaskState.SUCCEEDED);
//wait for reduce to be running before sending done
app.waitForState(reduceTask, TaskState.RUNNING);
//send the done signal to the reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduceTask.getAttempts().values().iterator().next().getID(),
TaskAttemptEventType.TA_DONE));
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
Assert.assertEquals("Job Start time not correct",
jobStartTime, job.getReport().getStartTime());
Assert.assertEquals("Task Start time not correct",
task1StartTime, mapTask1.getReport().getStartTime());
Assert.assertEquals("Task Finish time not correct",
task1FinishTime, mapTask1.getReport().getFinishTime());
Assert.assertEquals(2, job.getAMInfos().size());
int attemptNum = 1;
// Verify AMInfo
for (AMInfo amInfo : job.getAMInfos()) {
Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId()
.getAttemptId());
Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
.getApplicationAttemptId());
Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
}
long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();
Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst
&& am1StartTimeReal <= am2StartTimeEst);
Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst
&& am2StartTimeReal <= System.currentTimeMillis());
}
@Test(timeout=30000)
public void testRecoveryWithoutShuffleSecret() throws Exception {
int runCount = 0;
MRApp app = new MRAppNoShuffleSecret(2, 1, false,
this.getClass().getName(), true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
//all maps would be running
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
Task reduceTask = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator().next();
TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
app.waitForState(reduceTask, TaskState.RUNNING);
//send the done signal to the 1st map attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt.getID(),
TaskAttemptEventType.TA_DONE));
//wait for first map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
//stop the app
app.stop();
//in recovery the 1st map should NOT be recovered from previous run
//since the shuffle secret was not provided with the job credentials
//and had to be rolled per app attempt
app = new MRAppNoShuffleSecret(2, 1, false,
this.getClass().getName(), false, ++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
//all maps would be running
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
reduceTask = it.next();
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
task2Attempt = mapTask2.getAttempts().values().iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
//send the done signal to the 2nd map task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapTask2.getAttempts().values().iterator().next().getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(mapTask2, TaskState.SUCCEEDED);
//verify first map task is still running
app.waitForState(mapTask1, TaskState.RUNNING);
//send the done signal to the 2nd map task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapTask1.getAttempts().values().iterator().next().getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(mapTask1, TaskState.SUCCEEDED);
//wait for reduce to be running before sending done
app.waitForState(reduceTask, TaskState.RUNNING);
//send the done signal to the reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduceTask.getAttempts().values().iterator().next().getID(),
TaskAttemptEventType.TA_DONE));
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
}
@Test
public void testRecoverySuccessAttempt() {
LOG.info("--- START: testRecoverySuccessAttempt ---");
long clusterTimestamp = System.currentTimeMillis();
EventHandler mockEventHandler = mock(EventHandler.class);
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
mockEventHandler);
TaskId taskId = recoverMapTask.getID();
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
TaskID taskID = new TaskID(jobID,
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
//Mock up the TaskAttempts
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
new HashMap<TaskAttemptID, TaskAttemptInfo>();
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
TaskAttemptState.SUCCEEDED);
mockTaskAttempts.put(taId1, mockTAinfo1);
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
TaskAttemptState.FAILED);
mockTaskAttempts.put(taId2, mockTAinfo2);
OutputCommitter mockCommitter = mock (OutputCommitter.class);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
recoverMapTask.handle(
new TaskRecoverEvent(taskId, mockTaskInfo,mockCommitter, true));
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(mockEventHandler,atLeast(1)).handle(
(org.apache.hadoop.yarn.event.Event) arg.capture());
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
new HashMap<TaskAttemptID, TaskAttemptState>();
finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
jobHistoryEvents.add(EventType.TASK_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
jobHistoryEvents.add(EventType.TASK_FINISHED);
recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
arg, jobHistoryEvents, 2L, 1L);
}
@Test
public void testRecoveryAllFailAttempts() {
LOG.info("--- START: testRecoveryAllFailAttempts ---");
long clusterTimestamp = System.currentTimeMillis();
EventHandler mockEventHandler = mock(EventHandler.class);
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
mockEventHandler);
TaskId taskId = recoverMapTask.getID();
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
TaskID taskID = new TaskID(jobID,
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
//Mock up the TaskAttempts
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
new HashMap<TaskAttemptID, TaskAttemptInfo>();
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
TaskAttemptState.FAILED);
mockTaskAttempts.put(taId1, mockTAinfo1);
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
TaskAttemptState.FAILED);
mockTaskAttempts.put(taId2, mockTAinfo2);
OutputCommitter mockCommitter = mock (OutputCommitter.class);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskInfo.getTaskStatus()).thenReturn("FAILED");
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
recoverMapTask.handle(
new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(mockEventHandler,atLeast(1)).handle(
(org.apache.hadoop.yarn.event.Event) arg.capture());
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
new HashMap<TaskAttemptID, TaskAttemptState>();
finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
jobHistoryEvents.add(EventType.TASK_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
jobHistoryEvents.add(EventType.TASK_FAILED);
recoveryChecker(recoverMapTask, TaskState.FAILED, finalAttemptStates,
arg, jobHistoryEvents, 2L, 2L);
}
@Test
public void testRecoveryTaskSuccessAllAttemptsFail() {
LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---");
long clusterTimestamp = System.currentTimeMillis();
EventHandler mockEventHandler = mock(EventHandler.class);
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
mockEventHandler);
TaskId taskId = recoverMapTask.getID();
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
TaskID taskID = new TaskID(jobID,
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
//Mock up the TaskAttempts
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
new HashMap<TaskAttemptID, TaskAttemptInfo>();
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
TaskAttemptState.FAILED);
mockTaskAttempts.put(taId1, mockTAinfo1);
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
TaskAttemptState.FAILED);
mockTaskAttempts.put(taId2, mockTAinfo2);
OutputCommitter mockCommitter = mock (OutputCommitter.class);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
recoverMapTask.handle(
new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(mockEventHandler,atLeast(1)).handle(
(org.apache.hadoop.yarn.event.Event) arg.capture());
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
new HashMap<TaskAttemptID, TaskAttemptState>();
finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
// check for one new attempt launched since successful attempt not found
TaskAttemptID taId3 = new TaskAttemptID(taskID, 2000);
finalAttemptStates.put(taId3, TaskAttemptState.NEW);
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
jobHistoryEvents.add(EventType.TASK_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
recoveryChecker(recoverMapTask, TaskState.RUNNING, finalAttemptStates,
arg, jobHistoryEvents, 2L, 2L);
}
@Test
public void testRecoveryTaskSuccessAllAttemptsSucceed() {
LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---");
long clusterTimestamp = System.currentTimeMillis();
EventHandler mockEventHandler = mock(EventHandler.class);
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
mockEventHandler);
TaskId taskId = recoverMapTask.getID();
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
TaskID taskID = new TaskID(jobID,
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
//Mock up the TaskAttempts
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
new HashMap<TaskAttemptID, TaskAttemptInfo>();
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
TaskAttemptState.SUCCEEDED);
mockTaskAttempts.put(taId1, mockTAinfo1);
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
TaskAttemptState.SUCCEEDED);
mockTaskAttempts.put(taId2, mockTAinfo2);
OutputCommitter mockCommitter = mock (OutputCommitter.class);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
recoverMapTask.handle(
new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(mockEventHandler,atLeast(1)).handle(
(org.apache.hadoop.yarn.event.Event) arg.capture());
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
new HashMap<TaskAttemptID, TaskAttemptState>();
finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
finalAttemptStates.put(taId2, TaskAttemptState.SUCCEEDED);
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
jobHistoryEvents.add(EventType.TASK_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
jobHistoryEvents.add(EventType.TASK_FINISHED);
recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
arg, jobHistoryEvents, 2L, 0L);
}
@Test
public void testRecoveryAllAttemptsKilled() {
LOG.info("--- START: testRecoveryAllAttemptsKilled ---");
long clusterTimestamp = System.currentTimeMillis();
EventHandler mockEventHandler = mock(EventHandler.class);
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
mockEventHandler);
TaskId taskId = recoverMapTask.getID();
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
TaskID taskID = new TaskID(jobID,
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
//Mock up the TaskAttempts
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
new HashMap<TaskAttemptID, TaskAttemptInfo>();
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
TaskAttemptState.KILLED);
mockTaskAttempts.put(taId1, mockTAinfo1);
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
TaskAttemptState.KILLED);
mockTaskAttempts.put(taId2, mockTAinfo2);
OutputCommitter mockCommitter = mock (OutputCommitter.class);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskInfo.getTaskStatus()).thenReturn("KILLED");
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
recoverMapTask.handle(
new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(mockEventHandler,atLeast(1)).handle(
(org.apache.hadoop.yarn.event.Event) arg.capture());
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
new HashMap<TaskAttemptID, TaskAttemptState>();
finalAttemptStates.put(taId1, TaskAttemptState.KILLED);
finalAttemptStates.put(taId2, TaskAttemptState.KILLED);
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
jobHistoryEvents.add(EventType.TASK_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
jobHistoryEvents.add(EventType.TASK_FAILED);
recoveryChecker(recoverMapTask, TaskState.KILLED, finalAttemptStates,
arg, jobHistoryEvents, 2L, 0L);
}
private void recoveryChecker(MapTaskImpl checkTask, TaskState finalState,
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates,
ArgumentCaptor<Event> arg, List<EventType> expectedJobHistoryEvents,
long expectedMapLaunches, long expectedFailedMaps) {
assertEquals("Final State of Task", finalState, checkTask.getState());
Map<TaskAttemptId, TaskAttempt> recoveredAttempts =
checkTask.getAttempts();
assertEquals("Expected Number of Task Attempts",
finalAttemptStates.size(), recoveredAttempts.size());
for (TaskAttemptID taID : finalAttemptStates.keySet()) {
assertEquals("Expected Task Attempt State",
finalAttemptStates.get(taID),
recoveredAttempts.get(TypeConverter.toYarn(taID)).getState());
}
Iterator<Event> ie = arg.getAllValues().iterator();
int eventNum = 0;
long totalLaunchedMaps = 0;
long totalFailedMaps = 0;
boolean jobTaskEventReceived = false;
while (ie.hasNext()) {
Object current = ie.next();
++eventNum;
LOG.info(eventNum + " " + current.getClass().getName());
if (current instanceof JobHistoryEvent) {
JobHistoryEvent jhe = (JobHistoryEvent) current;
LOG.info(expectedJobHistoryEvents.get(0).toString() + " " +
jhe.getHistoryEvent().getEventType().toString() + " " +
jhe.getJobID());
assertEquals(expectedJobHistoryEvents.get(0),
jhe.getHistoryEvent().getEventType());
expectedJobHistoryEvents.remove(0);
} else if (current instanceof JobCounterUpdateEvent) {
JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current;
boolean containsUpdates = jcue.getCounterUpdates().size() > 0;
// there is no updates in a JobCounterUpdateEvent emitted on
// TaskAttempt recovery. Check that first.
if(containsUpdates) {
LOG.info("JobCounterUpdateEvent "
+ jcue.getCounterUpdates().get(0).getCounterKey()
+ " " + jcue.getCounterUpdates().get(0).getIncrementValue());
if (jcue.getCounterUpdates().get(0).getCounterKey() ==
JobCounter.NUM_FAILED_MAPS) {
totalFailedMaps += jcue.getCounterUpdates().get(0)
.getIncrementValue();
} else if (jcue.getCounterUpdates().get(0).getCounterKey() ==
JobCounter.TOTAL_LAUNCHED_MAPS) {
totalLaunchedMaps += jcue.getCounterUpdates().get(0)
.getIncrementValue();
}
}
} else if (current instanceof JobTaskEvent) {
JobTaskEvent jte = (JobTaskEvent) current;
assertEquals(jte.getState(), finalState);
jobTaskEventReceived = true;
}
}
assertTrue(jobTaskEventReceived || (finalState == TaskState.RUNNING));
assertEquals("Did not process all expected JobHistoryEvents",
0, expectedJobHistoryEvents.size());
assertEquals("Expected Map Launches",
expectedMapLaunches, totalLaunchedMaps);
assertEquals("Expected Failed Maps",
expectedFailedMaps, totalFailedMaps);
}
private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) {
ApplicationId appId = ApplicationId.newInstance(clusterTimestamp, 1);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
int partitions = 2;
Path remoteJobConfFile = mock(Path.class);
JobConf conf = new JobConf();
TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class);
Token<JobTokenIdentifier> jobToken =
(Token<JobTokenIdentifier>) mock(Token.class);
Credentials credentials = null;
Clock clock = SystemClock.getInstance();
int appAttemptId = 3;
MRAppMetrics metrics = mock(MRAppMetrics.class);
Resource minContainerRequirements = mock(Resource.class);
when(minContainerRequirements.getMemorySize()).thenReturn(1000L);
ClusterInfo clusterInfo = mock(ClusterInfo.class);
AppContext appContext = mock(AppContext.class);
when(appContext.getClusterInfo()).thenReturn(clusterInfo);
TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions,
eh, remoteJobConfFile, conf,
taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock,
appAttemptId, metrics, appContext);
return mapTask;
}
private TaskAttemptInfo getMockTaskAttemptInfo(TaskAttemptID tai,
TaskAttemptState tas) {
ContainerId ci = mock(ContainerId.class);
Counters counters = mock(Counters.class);
TaskType tt = TaskType.MAP;
long finishTime = System.currentTimeMillis();
TaskAttemptInfo mockTAinfo = mock(TaskAttemptInfo.class);
when(mockTAinfo.getAttemptId()).thenReturn(tai);
when(mockTAinfo.getContainerId()).thenReturn(ci);
when(mockTAinfo.getCounters()).thenReturn(counters);
when(mockTAinfo.getError()).thenReturn("");
when(mockTAinfo.getFinishTime()).thenReturn(finishTime);
when(mockTAinfo.getHostname()).thenReturn("localhost");
when(mockTAinfo.getHttpPort()).thenReturn(23);
when(mockTAinfo.getMapFinishTime()).thenReturn(finishTime - 1000L);
when(mockTAinfo.getPort()).thenReturn(24);
when(mockTAinfo.getRackname()).thenReturn("defaultRack");
when(mockTAinfo.getShuffleFinishTime()).thenReturn(finishTime - 2000L);
when(mockTAinfo.getShufflePort()).thenReturn(25);
when(mockTAinfo.getSortFinishTime()).thenReturn(finishTime - 3000L);
when(mockTAinfo.getStartTime()).thenReturn(finishTime -10000);
when(mockTAinfo.getState()).thenReturn("task in progress");
when(mockTAinfo.getTaskStatus()).thenReturn(tas.toString());
when(mockTAinfo.getTaskType()).thenReturn(tt);
when(mockTAinfo.getTrackerName()).thenReturn("TrackerName");
return mockTAinfo;
}
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key2, val2);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val2);
theRecordWriter.write(nullWritable, val1);
theRecordWriter.write(key1, nullWritable);
theRecordWriter.write(key2, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key1, val1);
} finally {
theRecordWriter.close(tContext);
}
OutputFormat outputFormat = ReflectionUtils.newInstance(
tContext.getOutputFormatClass(), conf);
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
committer.commitTask(tContext);
}
private void writeOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key1, val1);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val1);
theRecordWriter.write(nullWritable, val2);
theRecordWriter.write(key2, nullWritable);
theRecordWriter.write(key1, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key2, val2);
} finally {
theRecordWriter.close(tContext);
}
OutputFormat outputFormat = ReflectionUtils.newInstance(
tContext.getOutputFormatClass(), conf);
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
committer.commitTask(tContext);
}
private void validateOutput() throws IOException {
File expectedFile = new File(new Path(outputDir, partFile).toString());
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append('\t').append(val1).append("\n");
expectedOutput.append(val1).append("\n");
expectedOutput.append(val2).append("\n");
expectedOutput.append(key2).append("\n");
expectedOutput.append(key1).append("\n");
expectedOutput.append(key2).append('\t').append(val2).append("\n");
String output = slurp(expectedFile);
Assert.assertEquals(output, expectedOutput.toString());
}
public static String slurp(File f) throws IOException {
int len = (int) f.length();
byte[] buf = new byte[len];
FileInputStream in = new FileInputStream(f);
String contents = null;
try {
in.read(buf, 0, len);
contents = new String(buf, "UTF-8");
} finally {
in.close();
}
return contents;
}
static class MRAppWithHistory extends MRApp {
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
}
@Override
protected ContainerLauncher createContainerLauncher(AppContext context) {
MockContainerLauncher launcher = new MockContainerLauncher() {
@Override
public void handle(ContainerLauncherEvent event) {
TaskAttemptId taskAttemptID = event.getTaskAttemptID();
// Pass everything except the 2nd attempt of the first task.
if (taskAttemptID.getId() != 1
|| taskAttemptID.getTaskId().getId() != 0) {
super.handle(event);
}
}
};
launcher.shufflePort = 5467;
return launcher;
}
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
getStartCount());
return eventHandler;
}
}
static class MRAppNoShuffleSecret extends MRAppWithHistory {
public MRAppNoShuffleSecret(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
}
@Override
protected void initJobCredentialsAndUGI(Configuration conf) {
// do NOT put a shuffle secret in the job credentials
}
}
public static void main(String[] arg) throws Exception {
TestRecovery test = new TestRecovery();
test.testCrashed();
test.testMultipleCrashes();
test.testOutputRecovery();
test.testOutputRecoveryMapsOnly();
test.testRecoveryWithOldCommiter();
test.testSpeculative();
test.testRecoveryWithoutShuffleSecret();
test.testRecoverySuccessAttempt();
test.testRecoveryAllFailAttempts();
test.testRecoveryTaskSuccessAllAttemptsFail();
test.testRecoveryTaskSuccessAllAttemptsSucceed();
test.testRecoveryAllAttemptsKilled();
}
}