| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.mapred; |
| |
| import java.io.IOException; |
| |
| import junit.framework.TestCase; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.mapred.lib.IdentityMapper; |
| import org.apache.hadoop.mapred.lib.IdentityReducer; |
| import org.apache.hadoop.mapreduce.TaskType; |
| import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; |
| import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; |
| |
| /** |
| * Tests various failures in setup/cleanup of job, like |
| * throwing exception, command line kill and lost tracker |
| */ |
| public class TestSetupAndCleanupFailure extends TestCase { |
| |
| final Path inDir = new Path("./input"); |
| final Path outDir = new Path("./output"); |
| static Path setupSignalFile = new Path("/setup-signal"); |
| static Path cleanupSignalFile = new Path("/cleanup-signal"); |
| |
| // Commiter with setupJob throwing exception |
| static class CommitterWithFailSetup extends FileOutputCommitter { |
| @Override |
| public void setupJob(JobContext context) throws IOException { |
| throw new IOException(); |
| } |
| } |
| |
| // Commiter with commitJob throwing exception |
| static class CommitterWithFailCommit extends FileOutputCommitter { |
| @Override |
| public void commitJob(JobContext context) throws IOException { |
| throw new IOException(); |
| } |
| } |
| |
| // Committer waits for a file to be created on dfs. |
| static class CommitterWithLongSetupAndCommit extends FileOutputCommitter { |
| |
| private void waitForSignalFile(FileSystem fs, Path signalFile) |
| throws IOException { |
| while (!fs.exists(signalFile)) { |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ie) { |
| break; |
| } |
| } |
| } |
| |
| @Override |
| public void setupJob(JobContext context) throws IOException { |
| waitForSignalFile(FileSystem.get(context.getJobConf()), setupSignalFile); |
| super.setupJob(context); |
| } |
| |
| @Override |
| public void commitJob(JobContext context) throws IOException { |
| waitForSignalFile(FileSystem.get(context.getJobConf()), cleanupSignalFile); |
| super.commitJob(context); |
| } |
| } |
| |
| // Among these tips only one of the tasks will be running, |
| // get the taskid for that task |
| private TaskAttemptID getRunningTaskID(TaskInProgress[] tips) { |
| TaskAttemptID taskid = null; |
| while (taskid == null) { |
| for (TaskInProgress tip :tips) { |
| TaskStatus[] statuses = tip.getTaskStatuses(); |
| for (TaskStatus status : statuses) { |
| if (status.getRunState() == TaskStatus.State.RUNNING) { |
| taskid = status.getTaskID(); |
| break; |
| } |
| } |
| if (taskid != null) break; |
| } |
| try { |
| Thread.sleep(10); |
| } catch (InterruptedException ie) {} |
| } |
| return taskid; |
| } |
| |
| // Tests the failures in setup/cleanup job. Job should cleanly fail. |
| private void testFailCommitter(Class<? extends OutputCommitter> theClass, |
| JobConf jobConf) |
| throws IOException { |
| jobConf.setOutputCommitter(theClass); |
| RunningJob job = UtilsForTests.runJob(jobConf, inDir, outDir); |
| // wait for the job to finish. |
| job.waitForCompletion(); |
| assertEquals(JobStatus.FAILED, job.getJobState()); |
| } |
| |
| // launch job with CommitterWithLongSetupAndCleanup as committer |
| // and wait till the job is inited. |
| private RunningJob launchJobWithWaitingSetupAndCleanup(MiniMRCluster mr) |
| throws IOException { |
| // launch job with waiting setup/cleanup |
| JobConf jobConf = mr.createJobConf(); |
| jobConf.setOutputCommitter(CommitterWithLongSetupAndCommit.class); |
| RunningJob job = UtilsForTests.runJob(jobConf, inDir, outDir); |
| JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); |
| JobInProgress jip = jt.getJob(job.getID()); |
| while (!jip.inited()) { |
| try { |
| Thread.sleep(10); |
| } catch (InterruptedException ie) {} |
| } |
| return job; |
| } |
| |
| /** |
| * Tests setup and cleanup attempts getting killed from command-line |
| * and lost tracker |
| * |
| * @param mr |
| * @param dfs |
| * @param commandLineKill if true, test with command-line kill |
| * else, test with lost tracker |
| * @throws IOException |
| */ |
| private void testSetupAndCleanupKill(MiniMRCluster mr, |
| MiniDFSCluster dfs, |
| boolean commandLineKill) |
| throws IOException { |
| // launch job with waiting setup/cleanup |
| RunningJob job = launchJobWithWaitingSetupAndCleanup(mr); |
| |
| JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); |
| JobInProgress jip = jt.getJob(job.getID()); |
| // get the running setup task id |
| TaskAttemptID setupID = getRunningTaskID(jip.getTasks(TaskType.JOB_SETUP)); |
| if (commandLineKill) { |
| killTaskFromCommandLine(job, setupID, jt); |
| } else { |
| killTaskWithLostTracker(mr, setupID); |
| } |
| // signal the setup to complete |
| UtilsForTests.writeFile(dfs.getNameNode(), |
| dfs.getFileSystem().getConf(), |
| setupSignalFile, (short)3); |
| // wait for maps and reduces to complete |
| while (job.reduceProgress() != 1.0f) { |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException ie) {} |
| } |
| // get the running cleanup task id |
| TaskAttemptID cleanupID = |
| getRunningTaskID(jip.getTasks(TaskType.JOB_CLEANUP)); |
| if (commandLineKill) { |
| killTaskFromCommandLine(job, cleanupID, jt); |
| } else { |
| killTaskWithLostTracker(mr, cleanupID); |
| } |
| // signal the cleanup to complete |
| UtilsForTests.writeFile(dfs.getNameNode(), |
| dfs.getFileSystem().getConf(), |
| cleanupSignalFile, (short)3); |
| // wait for the job to finish. |
| job.waitForCompletion(); |
| assertEquals(JobStatus.SUCCEEDED, job.getJobState()); |
| assertEquals(TaskStatus.State.KILLED, |
| jt.getTaskStatus(setupID).getRunState()); |
| assertEquals(TaskStatus.State.KILLED, |
| jt.getTaskStatus(cleanupID).getRunState()); |
| } |
| |
| // kill the task from command-line |
| // wait till it kill is reported back |
| private void killTaskFromCommandLine(RunningJob job, |
| TaskAttemptID taskid, |
| JobTracker jt) |
| throws IOException { |
| job.killTask(taskid, false); |
| // wait till the kill happens |
| while (jt.getTaskStatus(taskid).getRunState() != |
| TaskStatus.State.KILLED) { |
| try { |
| Thread.sleep(10); |
| } catch (InterruptedException ie) {} |
| } |
| |
| } |
| // kill the task by losing the tracker |
| private void killTaskWithLostTracker(MiniMRCluster mr, |
| TaskAttemptID taskid) { |
| JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); |
| String trackerName = jt.getTaskStatus(taskid).getTaskTracker(); |
| int trackerID = mr.getTaskTrackerID(trackerName); |
| assertTrue(trackerID != -1); |
| mr.stopTaskTracker(trackerID); |
| } |
| |
| // Tests the failures in setup/cleanup job. Job should cleanly fail. |
| // Also Tests the command-line kill for setup/cleanup attempts. |
| // tests the setup/cleanup attempts getting killed if |
| // they were running on a lost tracker |
| public void testWithDFS() throws IOException { |
| MiniDFSCluster dfs = null; |
| MiniMRCluster mr = null; |
| FileSystem fileSys = null; |
| try { |
| final int taskTrackers = 4; |
| Configuration conf = new Configuration(); |
| dfs = new MiniDFSCluster(conf, 4, true, null); |
| fileSys = dfs.getFileSystem(); |
| JobConf jtConf = new JobConf(); |
| jtConf.setInt(TTConfig.TT_MAP_SLOTS, 1); |
| jtConf.setInt(TTConfig.TT_REDUCE_SLOTS, 1); |
| jtConf.setLong(JTConfig.JT_TRACKER_EXPIRY_INTERVAL, 10 * 1000); |
| mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, |
| null, null, jtConf); |
| // test setup/cleanup throwing exceptions |
| testFailCommitter(CommitterWithFailSetup.class, mr.createJobConf()); |
| testFailCommitter(CommitterWithFailCommit.class, mr.createJobConf()); |
| // test the command-line kill for setup/cleanup attempts. |
| testSetupAndCleanupKill(mr, dfs, true); |
| // remove setup/cleanup signal files. |
| fileSys.delete(setupSignalFile , true); |
| fileSys.delete(cleanupSignalFile , true); |
| // test the setup/cleanup attempts getting killed if |
| // they were running on a lost tracker |
| testSetupAndCleanupKill(mr, dfs, false); |
| } finally { |
| if (dfs != null) { dfs.shutdown(); } |
| if (mr != null) { mr.shutdown(); |
| } |
| } |
| } |
| |
| public static void main(String[] argv) throws Exception { |
| TestSetupAndCleanupFailure td = new TestSetupAndCleanupFailure(); |
| td.testWithDFS(); |
| } |
| } |