blob: 07706b10447e91343de1114ec3ed4a9452ce8f73 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
/**
* Tests various failures in setup/cleanup of job, like
* throwing exception, command line kill and lost tracker
*/
public class TestSetupAndCleanupFailure extends TestCase {
final Path inDir = new Path("./input");
final Path outDir = new Path("./output");
static Path setupSignalFile = new Path("/setup-signal");
static Path cleanupSignalFile = new Path("/cleanup-signal");
// Commiter with setupJob throwing exception
static class CommitterWithFailSetup extends FileOutputCommitter {
@Override
public void setupJob(JobContext context) throws IOException {
throw new IOException();
}
}
// Commiter with commitJob throwing exception
static class CommitterWithFailCommit extends FileOutputCommitter {
@Override
public void commitJob(JobContext context) throws IOException {
throw new IOException();
}
}
// Committer waits for a file to be created on dfs.
static class CommitterWithLongSetupAndCommit extends FileOutputCommitter {
private void waitForSignalFile(FileSystem fs, Path signalFile)
throws IOException {
while (!fs.exists(signalFile)) {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
break;
}
}
}
@Override
public void setupJob(JobContext context) throws IOException {
waitForSignalFile(FileSystem.get(context.getJobConf()), setupSignalFile);
super.setupJob(context);
}
@Override
public void commitJob(JobContext context) throws IOException {
waitForSignalFile(FileSystem.get(context.getJobConf()), cleanupSignalFile);
super.commitJob(context);
}
}
// Among these tips only one of the tasks will be running,
// get the taskid for that task
private TaskAttemptID getRunningTaskID(TaskInProgress[] tips) {
TaskAttemptID taskid = null;
while (taskid == null) {
for (TaskInProgress tip :tips) {
TaskStatus[] statuses = tip.getTaskStatuses();
for (TaskStatus status : statuses) {
if (status.getRunState() == TaskStatus.State.RUNNING) {
taskid = status.getTaskID();
break;
}
}
if (taskid != null) break;
}
try {
Thread.sleep(10);
} catch (InterruptedException ie) {}
}
return taskid;
}
// Tests the failures in setup/cleanup job. Job should cleanly fail.
private void testFailCommitter(Class<? extends OutputCommitter> theClass,
JobConf jobConf)
throws IOException {
jobConf.setOutputCommitter(theClass);
RunningJob job = UtilsForTests.runJob(jobConf, inDir, outDir);
// wait for the job to finish.
job.waitForCompletion();
assertEquals(JobStatus.FAILED, job.getJobState());
}
// launch job with CommitterWithLongSetupAndCleanup as committer
// and wait till the job is inited.
private RunningJob launchJobWithWaitingSetupAndCleanup(MiniMRCluster mr)
throws IOException {
// launch job with waiting setup/cleanup
JobConf jobConf = mr.createJobConf();
jobConf.setOutputCommitter(CommitterWithLongSetupAndCommit.class);
RunningJob job = UtilsForTests.runJob(jobConf, inDir, outDir);
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
JobInProgress jip = jt.getJob(job.getID());
while (!jip.inited()) {
try {
Thread.sleep(10);
} catch (InterruptedException ie) {}
}
return job;
}
/**
* Tests setup and cleanup attempts getting killed from command-line
* and lost tracker
*
* @param mr
* @param dfs
* @param commandLineKill if true, test with command-line kill
* else, test with lost tracker
* @throws IOException
*/
private void testSetupAndCleanupKill(MiniMRCluster mr,
MiniDFSCluster dfs,
boolean commandLineKill)
throws IOException {
// launch job with waiting setup/cleanup
RunningJob job = launchJobWithWaitingSetupAndCleanup(mr);
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
JobInProgress jip = jt.getJob(job.getID());
// get the running setup task id
TaskAttemptID setupID = getRunningTaskID(jip.getTasks(TaskType.JOB_SETUP));
if (commandLineKill) {
killTaskFromCommandLine(job, setupID, jt);
} else {
killTaskWithLostTracker(mr, setupID);
}
// signal the setup to complete
UtilsForTests.writeFile(dfs.getNameNode(),
dfs.getFileSystem().getConf(),
setupSignalFile, (short)3);
// wait for maps and reduces to complete
while (job.reduceProgress() != 1.0f) {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {}
}
// get the running cleanup task id
TaskAttemptID cleanupID =
getRunningTaskID(jip.getTasks(TaskType.JOB_CLEANUP));
if (commandLineKill) {
killTaskFromCommandLine(job, cleanupID, jt);
} else {
killTaskWithLostTracker(mr, cleanupID);
}
// signal the cleanup to complete
UtilsForTests.writeFile(dfs.getNameNode(),
dfs.getFileSystem().getConf(),
cleanupSignalFile, (short)3);
// wait for the job to finish.
job.waitForCompletion();
assertEquals(JobStatus.SUCCEEDED, job.getJobState());
assertEquals(TaskStatus.State.KILLED,
jt.getTaskStatus(setupID).getRunState());
assertEquals(TaskStatus.State.KILLED,
jt.getTaskStatus(cleanupID).getRunState());
}
// kill the task from command-line
// wait till it kill is reported back
private void killTaskFromCommandLine(RunningJob job,
TaskAttemptID taskid,
JobTracker jt)
throws IOException {
job.killTask(taskid, false);
// wait till the kill happens
while (jt.getTaskStatus(taskid).getRunState() !=
TaskStatus.State.KILLED) {
try {
Thread.sleep(10);
} catch (InterruptedException ie) {}
}
}
// kill the task by losing the tracker
private void killTaskWithLostTracker(MiniMRCluster mr,
TaskAttemptID taskid) {
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
String trackerName = jt.getTaskStatus(taskid).getTaskTracker();
int trackerID = mr.getTaskTrackerID(trackerName);
assertTrue(trackerID != -1);
mr.stopTaskTracker(trackerID);
}
// Tests the failures in setup/cleanup job. Job should cleanly fail.
// Also Tests the command-line kill for setup/cleanup attempts.
// tests the setup/cleanup attempts getting killed if
// they were running on a lost tracker
public void testWithDFS() throws IOException {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
try {
final int taskTrackers = 4;
Configuration conf = new Configuration();
dfs = new MiniDFSCluster(conf, 4, true, null);
fileSys = dfs.getFileSystem();
JobConf jtConf = new JobConf();
jtConf.setInt(TTConfig.TT_MAP_SLOTS, 1);
jtConf.setInt(TTConfig.TT_REDUCE_SLOTS, 1);
jtConf.setLong(JTConfig.JT_TRACKER_EXPIRY_INTERVAL, 10 * 1000);
mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1,
null, null, jtConf);
// test setup/cleanup throwing exceptions
testFailCommitter(CommitterWithFailSetup.class, mr.createJobConf());
testFailCommitter(CommitterWithFailCommit.class, mr.createJobConf());
// test the command-line kill for setup/cleanup attempts.
testSetupAndCleanupKill(mr, dfs, true);
// remove setup/cleanup signal files.
fileSys.delete(setupSignalFile , true);
fileSys.delete(cleanupSignalFile , true);
// test the setup/cleanup attempts getting killed if
// they were running on a lost tracker
testSetupAndCleanupKill(mr, dfs, false);
} finally {
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();
}
}
}
public static void main(String[] argv) throws Exception {
TestSetupAndCleanupFailure td = new TestSetupAndCleanupFailure();
td.testWithDFS();
}
}