blob: e8107276b0d303be2f0c244e43ad2251abb60773 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
import org.apache.hadoop.mapred.QueueManager.QueueACL;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test whether the {@link RecoveryManager} is able to tolerate job-recovery
* failures and the jobtracker is able to tolerate {@link RecoveryManager}
* failure.
*/
public class TestRecoveryManager {
private static final Log LOG =
LogFactory.getLog(TestRecoveryManager.class);
private static final Path TEST_DIR =
new Path(System.getProperty("test.build.data", "/tmp"),
"test-recovery-manager");
private FileSystem fs;
private MiniDFSCluster dfs;
private MiniMRCluster mr;
@Before
public void setUp() throws IOException {
fs = FileSystem.get(new Configuration());
fs.delete(TEST_DIR, true);
}
private void startCluster() throws IOException {
startCluster(new JobConf());
}
private void startCluster(JobConf conf) throws IOException {
mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
}
@After
public void tearDown() {
try {
if (mr != null) {
ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
.getClusterStatus(false);
if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
mr.shutdown();
}
}
} finally {
mr = null;
try {
if (dfs != null) {
dfs.shutdown();
dfs = null;
}
} finally {
dfs = null;
}
}
}
/**
* Tests the {@link JobTracker} against the exceptions thrown in
* {@link JobTracker.RecoveryManager}. It does the following :
* - submits 2 jobs
* - kills the jobtracker
* - deletes the info file for one job
* - restarts the jobtracker
* - checks if the jobtraker starts normally
*/
@Test(timeout=120000)
public void testJobTrackerRestartsWithMissingJobFile() throws Exception {
LOG.info("Testing jobtracker restart with faulty job");
startCluster();
String signalFile = new Path(TEST_DIR, "signal").toString();
JobConf job1 = mr.createJobConf();
UtilsForTests.configureWaitingJobConf(job1,
new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0,
"test-recovery-manager", signalFile, signalFile);
// submit the faulty job
RunningJob rJob1 = (new JobClient(job1)).submitJob(job1);
LOG.info("Submitted job " + rJob1.getID());
while (rJob1.mapProgress() < 0.5f) {
LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
UtilsForTests.waitFor(100);
}
JobConf job2 = mr.createJobConf();
UtilsForTests.configureWaitingJobConf(job2,
new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0,
"test-recovery-manager", signalFile, signalFile);
// submit another job
RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
LOG.info("Submitted job " + rJob2.getID());
while (rJob2.mapProgress() < 0.5f) {
LOG.info("Waiting for job " + rJob2.getID() + " to be 50% done");
UtilsForTests.waitFor(100);
}
// kill the jobtracker
LOG.info("Stopping jobtracker");
String sysDir = mr.getJobTrackerRunner().getJobTracker().getSystemDir();
mr.stopJobTracker();
// delete the job.xml of job #1 causing the job to fail in constructor
Path jobFile =
new Path(sysDir, rJob1.getID().toString() + "/" + JobTracker.JOB_INFO_FILE);
LOG.info("Deleting job token file : " + jobFile.toString());
Assert.assertTrue(fs.delete(jobFile, false)); // delete the job.xml file
// create the job.xml file with 1 bytes
FSDataOutputStream out = fs.create(jobFile);
out.write(1);
out.close();
// make sure that the jobtracker is in recovery mode
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
true);
// start the jobtracker
LOG.info("Starting jobtracker");
mr.startJobTracker();
JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
ClusterStatus status = jobtracker.getClusterStatus(false);
// check if the jobtracker came up or not
Assert.assertEquals("JobTracker crashed!",
JobTracker.State.RUNNING, status.getJobTrackerState());
// wait for job 2 to complete
JobInProgress jip = jobtracker.getJob(rJob2.getID());
while (!jip.isComplete()) {
LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
// Signaling Map task to complete
fs.create(new Path(TEST_DIR, "signal"));
UtilsForTests.waitFor(100);
}
Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
}
/**
* Tests the re-submission of the job in case of jobtracker died/restart
* - submits a job and let it be inited.
* - kills the jobtracker
* - checks if the jobtraker starts normally and job is recovered while
*/
@Test(timeout=120000)
public void testJobResubmission() throws Exception {
LOG.info("Testing Job Resubmission");
startCluster();
String signalFile = new Path(TEST_DIR, "signal").toString();
// make sure that the jobtracker is in recovery mode
mr.getJobTrackerConf()
.setBoolean("mapred.jobtracker.restart.recover", true);
JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
JobConf job1 = mr.createJobConf();
UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
signalFile);
JobClient jc1 = new JobClient(job1);
RunningJob rJob1 = jc1.submitJob(job1);
LOG.info("Submitted first job " + rJob1.getID());
while (rJob1.mapProgress() < 0.5f) {
LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
UtilsForTests.waitFor(100);
}
// now submit job2
JobConf job2 = mr.createJobConf();
String signalFile1 = new Path(TEST_DIR, "signal1").toString();
UtilsForTests.configureWaitingJobConf(job2,
new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0,
"test-recovery-manager", signalFile1, signalFile1);
job2.setBoolean(JobConf.MAPREDUCE_RECOVER_JOB, false); // don't recover
// submit the job
RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
LOG.info("Submitted job " + rJob2.getID());
// wait for it to init
JobInProgress jip2 = jobtracker.getJob(rJob2.getID());
while (!jip2.inited()) {
LOG.info("Waiting for job " + jip2.getJobID() + " to be inited");
UtilsForTests.waitFor(100);
}
// kill the jobtracker
LOG.info("Stopping jobtracker");
mr.stopJobTracker();
// start the jobtracker
LOG.info("Starting jobtracker");
mr.startJobTracker();
UtilsForTests.waitForJobTracker(jc1);
jobtracker = mr.getJobTrackerRunner().getJobTracker();
// assert that only job1 is recovered by the jobtracker
Assert.assertEquals("Resubmission failed ", 1,
jobtracker.getAllJobs().length);
// wait for job 1 to complete
JobInProgress jip = jobtracker.getJob(rJob1.getID());
while (!jip.isComplete()) {
LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
// Signaling Map task to complete
fs.create(new Path(TEST_DIR, "signal"));
UtilsForTests.waitFor(100);
}
Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
Assert.assertTrue("Job should be cleaned up", !fs.exists(new Path(job1.get("mapreduce.job.dir"))));
Assert.assertTrue("Job should be cleaned up", !fs.exists(new Path(job2.get("mapreduce.job.dir"))));
}
public static class TestJobTrackerInstrumentation extends JobTrackerInstrumentation {
static CountDownLatch finalizeCall = new CountDownLatch(1);
public TestJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
super(jt, conf);
}
public void finalizeJob(JobConf conf, JobID id) {
if (finalizeCall.getCount() == 0) {
return;
}
finalizeCall.countDown();
throw new IllegalStateException("Controlled error finalizing job");
}
}
@Test
public void testJobTrackerRestartBeforeJobFinalization() throws Exception {
LOG.info("Testing Job Resubmission");
JobConf conf = new JobConf();
// make sure that the jobtracker is in recovery mode
conf.setBoolean("mapred.jobtracker.restart.recover", true);
// use a test JobTrackerInstrumentation implementation to shut down
// the jobtracker after the tasks have all completed, but
// before the job is finalized and check that it can be recovered correctly
conf.setClass("mapred.jobtracker.instrumentation", TestJobTrackerInstrumentation.class,
JobTrackerInstrumentation.class);
startCluster(conf);
JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
SleepJob job = new SleepJob();
job.setConf(mr.createJobConf());
JobConf job1 = job.setupJobConf(1, 0, 1, 1, 1, 1);
JobClient jc = new JobClient(job1);
RunningJob rJob1 = jc.submitJob(job1);
LOG.info("Submitted first job " + rJob1.getID());
TestJobTrackerInstrumentation.finalizeCall.await();
// kill the jobtracker
LOG.info("Stopping jobtracker");
mr.stopJobTracker();
// start the jobtracker
LOG.info("Starting jobtracker");
mr.startJobTracker();
UtilsForTests.waitForJobTracker(jc);
jobtracker = mr.getJobTrackerRunner().getJobTracker();
// assert that job is recovered by the jobtracker
Assert.assertEquals("Resubmission failed ", 1,
jobtracker.getAllJobs().length);
// wait for job 1 to complete
JobInProgress jip = jobtracker.getJob(rJob1.getID());
while (!jip.isComplete()) {
LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
UtilsForTests.waitFor(100);
}
Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
}
/**
* Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown
* during recovery. It does the following :
* - submits a job with HIGH priority and x tasks
* - allows it to complete 50%
* - submits another job with normal priority and y tasks
* - kills the jobtracker
* - restarts the jobtracker with max-tasks-per-job such that
* y < max-tasks-per-job < x
* - checks if the jobtraker starts normally and job#2 is recovered while
* job#1 is failed.
*/
@Test(timeout=120000)
public void testJobTrackerRestartWithBadJobs() throws Exception {
LOG.info("Testing recovery-manager");
startCluster();
String signalFile = new Path(TEST_DIR, "signal").toString();
// make sure that the jobtracker is in recovery mode
mr.getJobTrackerConf()
.setBoolean("mapred.jobtracker.restart.recover", true);
JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
JobConf job1 = mr.createJobConf();
// set the high priority
job1.setJobPriority(JobPriority.HIGH);
UtilsForTests.configureWaitingJobConf(job1,
new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 30, 0,
"test-recovery-manager", signalFile, signalFile);
// submit the faulty job
JobClient jc = new JobClient(job1);
RunningJob rJob1 = jc.submitJob(job1);
LOG.info("Submitted first job " + rJob1.getID());
while (rJob1.mapProgress() < 0.5f) {
LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
UtilsForTests.waitFor(100);
}
// now submit job2
JobConf job2 = mr.createJobConf();
String signalFile1 = new Path(TEST_DIR, "signal1").toString();
UtilsForTests.configureWaitingJobConf(job2,
new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 20, 0,
"test-recovery-manager", signalFile1, signalFile1);
// submit the job
RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
LOG.info("Submitted job " + rJob2.getID());
// wait for it to init
JobInProgress jip = jobtracker.getJob(rJob2.getID());
while (!jip.inited()) {
LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
UtilsForTests.waitFor(100);
}
// now submit job3 with inappropriate acls
final JobConf job3 = mr.createJobConf();
UserGroupInformation ugi3 =
UserGroupInformation.createUserForTesting("abc", new String[]{"users"});
UtilsForTests.configureWaitingJobConf(job3,
new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 1, 0,
"test-recovery-manager", signalFile, signalFile);
// submit the job
RunningJob rJob3 = ugi3.doAs(new PrivilegedExceptionAction<RunningJob>() {
public RunningJob run() throws IOException {
return (new JobClient(job3)).submitJob(job3);
}
});
LOG.info("Submitted job " + rJob3.getID() + " with different user");
jip = jobtracker.getJob(rJob3.getID());
while (!jip.inited()) {
LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
UtilsForTests.waitFor(100);
}
// kill the jobtracker
LOG.info("Stopping jobtracker");
mr.stopJobTracker();
// make sure that the jobtracker is in recovery mode
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
true);
mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
mr.getJobTrackerConf().setBoolean(JobConf.MR_ACLS_ENABLED, true);
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
mr.getJobTrackerConf().set(QueueManager.toFullPropertyName(
"default", QueueACL.SUBMIT_JOB.getAclName()), ugi.getUserName());
// start the jobtracker
LOG.info("Starting jobtracker");
mr.startJobTracker();
UtilsForTests.waitForJobTracker(jc);
jobtracker = mr.getJobTrackerRunner().getJobTracker();
// assert that job2 is recovered by the jobtracker as job1 would fail
Assert.assertEquals("Recovery manager failed to tolerate job failures", 1,
jobtracker.getAllJobs().length);
// check if the job#1 has failed
JobStatus status = jobtracker.getJobStatus(rJob1.getID());
Assert.assertNull("Faulty job should not be resubmitted", status);
jip = jobtracker.getJob(rJob2.getID());
Assert.assertFalse("Job should be running", jip.isComplete());
status = jobtracker.getJobStatus(rJob3.getID());
Assert.assertNull("Job should be missing because of ACL changed", status);
// wait for job 2 to complete
while (!jip.isComplete()) {
LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
// Signaling Map task to complete
fs.create(new Path(TEST_DIR, "signal1"));
UtilsForTests.waitFor(100);
}
Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
}
/**
* Test if restart count of the jobtracker is correctly managed.
* Steps are as follows :
* - start the jobtracker and check if the info file gets created.
* - stops the jobtracker, deletes the jobtracker.info file and checks if
* upon restart the recovery is 'off'
* - submit a job to the jobtracker.
* - restart the jobtracker k times and check if the restart count on ith
* iteration is i.
* - submit a new job and check if its restart count is 0.
* - garble the jobtracker.info file and restart he jobtracker, the
* jobtracker should crash.
*/
@Test(timeout=120000)
public void testRestartCount() throws Exception {
LOG.info("Testing Job Restart Count");
startCluster();
String signalFile = new Path(TEST_DIR, "signal").toString();
// make sure that the jobtracker is in recovery mode
mr.getJobTrackerConf()
.setBoolean("mapred.jobtracker.restart.recover", true);
JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
JobConf job1 = mr.createJobConf();
// set the high priority
job1.setJobPriority(JobPriority.HIGH);
UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
new Path(TEST_DIR, "output7"), 30, 0, "test-restart", signalFile,
signalFile);
// submit the faulty job
JobClient jc = new JobClient(job1);
RunningJob rJob1 = jc.submitJob(job1);
LOG.info("Submitted first job " + rJob1.getID());
JobInProgress jip = jobtracker.getJob(rJob1.getID());
while (!jip.inited()) {
LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
UtilsForTests.waitFor(100);
}
for (int i = 1; i <= 2; ++i) {
LOG.info("Stopping jobtracker for " + i + " time");
mr.stopJobTracker();
// start the jobtracker
LOG.info("Starting jobtracker for " + i + " time");
mr.startJobTracker();
UtilsForTests.waitForJobTracker(jc);
jobtracker = mr.getJobTrackerRunner().getJobTracker();
// assert if restart count is correct
// It should always be 0 now as its resubmit everytime then restart.
Assert.assertEquals("Recovery manager failed to recover restart count",
0, jip.getNumRestarts());
}
// kill the old job
rJob1.killJob();
// II. Submit a new job and check if the restart count is 0
JobConf job2 = mr.createJobConf();
UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"),
new Path(TEST_DIR, "output8"), 50, 0, "test-restart-manager",
signalFile, signalFile);
// submit a new job
RunningJob rJob2 = jc.submitJob(job2);
LOG.info("Submitted first job after restart" + rJob2.getID());
// assert if restart count is correct
jip = jobtracker.getJob(rJob2.getID());
Assert.assertEquals("Restart count for new job is incorrect", 0, jip
.getNumRestarts());
LOG.info("Stopping jobtracker for testing the fs errors");
mr.stopJobTracker();
// check if system.dir problems in recovery kills the jobtracker
Path rFile = jobtracker.recoveryManager.getRestartCountFile();
fs.delete(rFile, false);
FSDataOutputStream out = fs.create(rFile);
out.writeBoolean(true);
out.close();
// start the jobtracker
LOG.info("Starting jobtracker with fs errors");
mr.startJobTracker();
JobTrackerRunner runner = mr.getJobTrackerRunner();
Assert.assertFalse("JobTracker is still alive", runner.isActive());
}
/**
* Test if the jobtracker waits for the info file to be created before
* starting.
*/
@Test(timeout=120000)
public void testJobTrackerInfoCreation() throws Exception {
LOG.info("Testing jobtracker.info file");
dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
String namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
+ (dfs.getFileSystem()).getUri().getPort();
// shut down the data nodes
dfs.shutdownDataNodes();
// start the jobtracker
JobConf conf = new JobConf();
FileSystem.setDefaultUri(conf, namenode);
conf.set("mapred.job.tracker", "localhost:0");
conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
JobTracker jobtracker = new JobTracker(conf);
jobtracker.setSafeModeInternal(JobTracker.SafeModeAction.SAFEMODE_ENTER);
jobtracker.initializeFilesystem();
jobtracker.setSafeModeInternal(JobTracker.SafeModeAction.SAFEMODE_LEAVE);
jobtracker.initialize();
// now check if the update restart count works fine or not
boolean failed = false;
try {
jobtracker.recoveryManager.updateRestartCount();
} catch (IOException ioe) {
failed = true;
}
Assert.assertTrue("JobTracker created info files without datanodes!!!",
failed);
Path restartFile = jobtracker.recoveryManager.getRestartCountFile();
Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile();
FileSystem fs = dfs.getFileSystem();
Assert.assertFalse("Info file exists after update failure",
fs.exists(restartFile));
Assert.assertFalse("Temporary restart-file exists after update failure",
fs.exists(restartFile));
// start 1 data node
dfs.startDataNodes(conf, 1, true, null, null, null, null);
dfs.waitActive();
failed = false;
try {
jobtracker.recoveryManager.updateRestartCount();
} catch (IOException ioe) {
failed = true;
}
Assert.assertFalse("JobTracker failed to create info files with datanodes!",
failed);
}
static void mkdirWithPerms(FileSystem fs, String dir, short mode) throws IOException {
Path p = new Path(dir);
fs.mkdirs(p);
fs.setPermission(p, new FsPermission(mode));
}
@Test(timeout=120000)
public void testJobResubmissionAsDifferentUser() throws Exception {
LOG.info("Testing Job Resubmission as a different user to the jobtracker");
final Path HDFS_TEST_DIR = new Path("/tmp");
JobConf conf = new JobConf();
dfs = new MiniDFSCluster(conf, 1, true, null);
fs = dfs.getFileSystem();
conf.set("mapreduce.jobtracker.staging.root.dir", "/user");
conf.set("mapred.system.dir", "/mapred");
String mapredSysDir = conf.get("mapred.system.dir");
mkdirWithPerms(fs, mapredSysDir, (short)0700);
fs.setOwner(new Path(mapredSysDir),
UserGroupInformation.getCurrentUser().getUserName(), "mrgroup");
mkdirWithPerms(fs, "/user", (short)0777);
mkdirWithPerms(fs, "/mapred", (short)0777);
mkdirWithPerms(fs, "/tmp", (short)0777);
mr =
new MiniMRCluster(
1, dfs.getFileSystem().getUri().toString(), 1, null, null, conf);
String signalFile = new Path(HDFS_TEST_DIR, "signal").toString();
// make sure that the jobtracker is in recovery mode
mr.getJobTrackerConf()
.setBoolean("mapred.jobtracker.restart.recover", true);
JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
final JobConf job1 = mr.createJobConf();
UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
signalFile);
UserGroupInformation ugi =
UserGroupInformation.createUserForTesting("bob", new String[]{"users"});
job1.setUser(ugi.getUserName());
JobClient jc = new JobClient(job1);
RunningJob rJob1 = ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
public RunningJob run() throws IOException {
JobClient jc = new JobClient(job1);
return jc.submitJob(job1);
}
});
LOG.info("Submitted first job " + rJob1.getID());
while (rJob1.mapProgress() < 0.5f) {
LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
UtilsForTests.waitFor(100);
}
// kill the jobtracker
LOG.info("Stopping jobtracker");
mr.stopJobTracker();
// Blocking JT INIT on restart
mr.getJobTrackerConf().setBoolean(
JobTracker.JT_INIT_CONFIG_KEY_FOR_TESTS, false);
// start the jobtracker
LOG.info("Starting jobtracker");
mr.startJobTracker(false);
while (!mr.getJobTrackerRunner().isUp()) {
Thread.sleep(100);
}
jobtracker = mr.getJobTrackerRunner().getJobTracker();
Assert.assertNotNull(jobtracker);
// now check for job status ...
// should throw JobTrackerNotYetInitializedException
boolean gotJTNYIException = false;
try {
jobtracker.getJobStatus(rJob1.getID());
} catch (JobTrackerNotYetInitializedException jtnyie) {
LOG.info("Caught JobTrackerNotYetInitializedException", jtnyie);
gotJTNYIException = true;
}
Assert.assertTrue(gotJTNYIException);
jobtracker.setInitDone(true);
UtilsForTests.waitForJobTracker(jc);
// assert that job is recovered by the jobtracker
Assert.assertEquals("Resubmission failed ", 1, jobtracker.getAllJobs().length);
JobInProgress jip = jobtracker.getJob(rJob1.getID());
// Signaling Map task to complete
fs.create(new Path(HDFS_TEST_DIR, "signal"));
while (!jip.isComplete()) {
LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
UtilsForTests.waitFor(100);
}
rJob1 = jc.getJob(rJob1.getID());
Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
}
@Test(timeout=120000)
public void testJobInitError() throws Exception {
LOG.info("Testing error during Job submission");
final Path HDFS_TEST_DIR = new Path("/tmp");
JobConf conf = new JobConf();
dfs = new MiniDFSCluster(conf, 1, true, null);
fs = dfs.getFileSystem();
conf.set("mapreduce.jobtracker.staging.root.dir", "/user");
conf.set("mapred.system.dir", "/mapred");
String mapredSysDir = conf.get("mapred.system.dir");
mkdirWithPerms(fs, mapredSysDir, (short)0700);
fs.setOwner(new Path(mapredSysDir),
UserGroupInformation.getCurrentUser().getUserName(), "mrgroup");
mkdirWithPerms(fs, "/user", (short)0777);
mkdirWithPerms(fs, "/mapred", (short)0777);
mkdirWithPerms(fs, "/tmp", (short)0777);
mr =
new MiniMRCluster(
1, dfs.getFileSystem().getUri().toString(), 1, null, null, conf);
String signalFile = new Path(HDFS_TEST_DIR, "signal").toString();
// make sure that the jobtracker is in recovery mode
mr.getJobTrackerConf()
.setBoolean("mapred.jobtracker.restart.recover", true);
JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
final JobConf job1 = mr.createJobConf();
UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
signalFile);
UserGroupInformation ugi =
UserGroupInformation.createUserForTesting("bob", new String[]{"users"});
job1.setUser(ugi.getUserName());
JobClient jc = new JobClient(job1);
RunningJob rJob1 = ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
public RunningJob run() throws IOException {
// Job 1 init should fail
job1.setBoolean(JobInProgress.JOB_INIT_EXCEPTION, true);
JobClient jc = new JobClient(job1);
return jc.submitJob(job1);
}
});
LOG.info("Submitted first job " + rJob1.getID());
// kill the jobtracker
LOG.info("Stopping jobtracker");
mr.stopJobTracker();
// start the jobtracker, after turning off job-init exception
LOG.info("Starting jobtracker");
mr.getJobTrackerConf().setBoolean(
JobInProgress.JT_JOB_INIT_EXCEPTION_OVERRIDE, true);
mr.startJobTracker(false);
while (!mr.getJobTrackerRunner().isUp()) {
Thread.sleep(100);
}
jobtracker = mr.getJobTrackerRunner().getJobTracker();
Assert.assertNotNull(jobtracker);
UtilsForTests.waitForJobTracker(jc);
// assert that job is recovered by the jobtracker
Assert.assertEquals("Resubmission failed ", 1, jobtracker.getAllJobs().length);
JobInProgress jip = jobtracker.getJob(rJob1.getID());
// Signaling Map task to complete
fs.create(new Path(HDFS_TEST_DIR, "signal"));
while (!jip.isComplete()) {
LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
UtilsForTests.waitFor(100);
}
rJob1 = jc.getJob(rJob1.getID());
Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
}
}