blob: 19cacfd699e6e9a5177d3acc85442f47d4ea26d3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapred.QueueManager.QueueACL;
import org.apache.hadoop.security.UserGroupInformation;
import junit.framework.TestCase;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import org.junit.*;
/**
* TestJobTrackerRestart checks if the jobtracker can restart. JobTracker
* should be able to continue running the previously running jobs and also
* recover previosuly submitted jobs.
*/
/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
*/
@Ignore
public class TestJobTrackerRestart extends TestCase {
static final Path testDir =
new Path(System.getProperty("test.build.data","/tmp"),
"jt-restart-testing");
final Path inDir = new Path(testDir, "input");
static final Path shareDir = new Path(testDir, "share");
final Path outputDir = new Path(testDir, "output");
private static int numJobsSubmitted = 0;
/**
* Return the job conf configured with the priorities and mappers as passed.
* @param conf The default conf
* @param priorities priorities for the jobs
* @param numMaps number of maps for the jobs
* @param numReds number of reducers for the jobs
* @param outputDir output dir
* @param inDir input dir
* @param mapSignalFile filename thats acts as a signal for maps
* @param reduceSignalFile filename thats acts as a signal for reducers
* @return a array of jobconfs configured as needed
* @throws IOException
*/
private static JobConf[] getJobs(JobConf conf, JobPriority[] priorities,
int[] numMaps, int[] numReds,
Path outputDir, Path inDir,
String mapSignalFile, String reduceSignalFile)
throws IOException {
JobConf[] jobs = new JobConf[priorities.length];
for (int i = 0; i < jobs.length; ++i) {
jobs[i] = new JobConf(conf);
Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++));
UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir,
numMaps[i], numReds[i], "jt restart test job", mapSignalFile,
reduceSignalFile);
jobs[i].setJobPriority(priorities[i]);
}
return jobs;
}
/**
* Clean up the signals.
*/
private static void cleanUp(FileSystem fileSys, Path dir) throws IOException {
// Delete the map signal file
fileSys.delete(new Path(getMapSignalFile(dir)), false);
// Delete the reduce signal file
fileSys.delete(new Path(getReduceSignalFile(dir)), false);
}
/**
* Tests the jobtracker with restart-recovery turned off.
* Submit a job with normal priority, maps = 2, reducers = 0}
*
* Wait for the job to complete 50%
*
* Restart the jobtracker with recovery turned off
*
* Check if the job is missing
*/
public void testRestartWithoutRecovery(MiniDFSCluster dfs,
MiniMRCluster mr)
throws IOException {
// III. Test a job with waiting mapper and recovery turned off
FileSystem fileSys = dfs.getFileSystem();
cleanUp(fileSys, shareDir);
JobConf newConf = getJobs(mr.createJobConf(),
new JobPriority[] {JobPriority.NORMAL},
new int[] {2}, new int[] {0},
outputDir, inDir,
getMapSignalFile(shareDir),
getReduceSignalFile(shareDir))[0];
JobClient jobClient = new JobClient(newConf);
RunningJob job = jobClient.submitJob(newConf);
JobID id = job.getID();
// make sure that the job is 50% completed
while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
UtilsForTests.waitFor(100);
}
mr.stopJobTracker();
// Turn off the recovery
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
false);
// Wait for a minute before submitting a job
UtilsForTests.waitFor(60 * 1000);
mr.startJobTracker();
// Signal the tasks
UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
getReduceSignalFile(shareDir));
// Wait for the JT to be ready
UtilsForTests.waitForJobTracker(jobClient);
UtilsForTests.waitTillDone(jobClient);
// The submitted job should not exist
assertTrue("Submitted job was detected with recovery disabled",
UtilsForTests.getJobStatus(jobClient, id) == null);
}
/** Tests a job on jobtracker with restart-recovery turned on.
* Preparation :
* - Configure a job with
* - num-maps : 50
* - num-reducers : 1
* - Configure the cluster to run 1 reducer
* - Lower the history file block size and buffer
*
* Wait for the job to complete 50%. Note that all the job is configured to
* use {@link HalfWaitingMapper} and {@link WaitingReducer}. So job will
* eventually wait on 50%
*
* Make a note of the following things
* - Task completion events
* - Cluster status
* - Task Reports
* - Job start time
*
* Restart the jobtracker
*
* Wait for job to finish all the maps and note the TaskCompletion events at
* the tracker.
*
* Wait for all the jobs to finish and note the following
* - New task completion events at the jobtracker
* - Task reports
* - Cluster status
*
* Check for the following
* - Task completion events for recovered tasks should match
* - Task completion events at the tasktracker and the restarted
* jobtracker should be same
* - Cluster status should be fine.
* - Task Reports for recovered tasks should match
* Checks
* - start time
* - finish time
* - counters
* - http-location
* - task-id
* - Job start time should match
* - Check if the counters can be accessed
* - Check if the history files are (re)named properly
*/
public void testTaskEventsAndReportsWithRecovery(MiniDFSCluster dfs,
MiniMRCluster mr)
throws IOException {
// II. Test a tasktracker with waiting mapper and recovery turned on.
// Ideally the tracker should SYNC with the new/restarted jobtracker
FileSystem fileSys = dfs.getFileSystem();
final int numMaps = 50;
final int numReducers = 1;
cleanUp(fileSys, shareDir);
JobConf newConf = getJobs(mr.createJobConf(),
new JobPriority[] {JobPriority.NORMAL},
new int[] {numMaps}, new int[] {numReducers},
outputDir, inDir,
getMapSignalFile(shareDir),
getReduceSignalFile(shareDir))[0];
JobClient jobClient = new JobClient(newConf);
RunningJob job = jobClient.submitJob(newConf);
JobID id = job.getID();
// change the job priority
mr.setJobPriority(id, JobPriority.HIGH);
mr.initializeJob(id);
// make sure that atleast on reducer is spawned
while (jobClient.getClusterStatus().getReduceTasks() == 0) {
UtilsForTests.waitFor(100);
}
while(true) {
// Since we are using a half waiting mapper, maps should be stuck at 50%
TaskCompletionEvent[] trackerEvents =
mr.getMapTaskCompletionEventsUpdates(0, id, numMaps)
.getMapTaskCompletionEvents();
if (trackerEvents.length < numMaps / 2) {
UtilsForTests.waitFor(1000);
} else {
break;
}
}
TaskCompletionEvent[] prevEvents =
mr.getTaskCompletionEvents(id, 0, numMaps);
TaskReport[] prevSetupReports = jobClient.getSetupTaskReports(id);
TaskReport[] prevMapReports = jobClient.getMapTaskReports(id);
ClusterStatus prevStatus = jobClient.getClusterStatus();
mr.stopJobTracker();
// Turn off the recovery
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
true);
// Wait for a minute before submitting a job
UtilsForTests.waitFor(60 * 1000);
mr.startJobTracker();
// Signal the map tasks
UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
getReduceSignalFile(shareDir));
// Wait for the JT to be ready
UtilsForTests.waitForJobTracker(jobClient);
int numToMatch = mr.getNumEventsRecovered() / 2;
// make sure that the maps are completed
while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 1.0f) {
UtilsForTests.waitFor(100);
}
// Get the new jobtrackers events
TaskCompletionEvent[] jtEvents =
mr.getTaskCompletionEvents(id, 0, 2 * numMaps);
// Test if all the events that were recovered match exactly
testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
// Check the task reports
// The reports should match exactly if the attempts are same
TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
TaskReport[] afterSetupReports = jobClient.getSetupTaskReports(id);
testTaskReports(prevMapReports, afterMapReports, numToMatch - 1);
testTaskReports(prevSetupReports, afterSetupReports, 1);
// check the job priority
assertEquals("Job priority change is not reflected",
JobPriority.HIGH, mr.getJobPriority(id));
List<TaskCompletionEvent> jtMapEvents =
new ArrayList<TaskCompletionEvent>();
for (TaskCompletionEvent tce : jtEvents) {
if (tce.isMapTask()) {
jtMapEvents.add(tce);
}
}
TaskCompletionEvent[] trackerEvents;
while(true) {
// Wait for the tracker to pull all the map events
trackerEvents =
mr.getMapTaskCompletionEventsUpdates(0, id, jtMapEvents.size())
.getMapTaskCompletionEvents();
if (trackerEvents.length < jtMapEvents.size()) {
UtilsForTests.waitFor(1000);
} else {
break;
}
}
// Signal the reduce tasks
UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
getReduceSignalFile(shareDir));
UtilsForTests.waitTillDone(jobClient);
testTaskCompletionEvents(jtMapEvents.toArray(new TaskCompletionEvent[0]),
trackerEvents, true, -1);
// validate the history file
TestJobHistory.validateJobHistoryFileFormat(id, newConf, "SUCCESS", true);
TestJobHistory.validateJobHistoryFileContent(mr, job, newConf);
// check if the cluster status is insane
ClusterStatus status = jobClient.getClusterStatus();
assertTrue("Cluster status is insane",
checkClusterStatusOnCompletion(status, prevStatus));
}
/**
* Matches specified number of task reports.
* @param source the reports to be matched
* @param target reports to match with
* @param numToMatch num reports to match
* @param mismatchSet reports that should not match
*/
private void testTaskReports(TaskReport[] source, TaskReport[] target,
int numToMatch) {
for (int i = 0; i < numToMatch; ++i) {
// Check if the task reports was recovered correctly
assertTrue("Task reports for same attempt has changed",
source[i].equals(target[i]));
}
}
/**
* Matches the task completion events.
* @param source the events to be matched
* @param target events to match with
* @param fullMatch whether to match the events completely or partially
* @param numToMatch number of events to match in case full match is not
* desired
* @param ignoreSet a set of taskids to ignore
*/
private void testTaskCompletionEvents(TaskCompletionEvent[] source,
TaskCompletionEvent[] target,
boolean fullMatch,
int numToMatch) {
// Check if the event list size matches
// The lengths should match only incase of full match
if (fullMatch) {
assertEquals("Map task completion events mismatch",
source.length, target.length);
numToMatch = source.length;
}
// Check if the events match
for (int i = 0; i < numToMatch; ++i) {
if (source[i].getTaskAttemptId().equals(target[i].getTaskAttemptId())){
assertTrue("Map task completion events ordering mismatch",
source[i].equals(target[i]));
}
}
}
private boolean checkClusterStatusOnCompletion(ClusterStatus status,
ClusterStatus prevStatus) {
return status.getJobTrackerState() == prevStatus.getJobTrackerState()
&& status.getMapTasks() == 0
&& status.getReduceTasks() == 0;
}
/** Committer with setup waiting
*/
static class CommitterWithDelaySetup extends FileOutputCommitter {
@Override
public void setupJob(JobContext context) throws IOException {
FileSystem fs = FileSystem.get(context.getConfiguration());
while (true) {
if (fs.exists(shareDir)) {
break;
}
UtilsForTests.waitFor(100);
}
super.cleanupJob(context);
}
}
/** Tests a job on jobtracker with restart-recovery turned on and empty
* jobhistory file.
* Preparation :
* - Configure a job with
* - num-maps : 0 (long waiting setup)
* - num-reducers : 0
*
* Check if the job succeedes after restart.
*
* Assumption that map slots are given first for setup.
*/
public void testJobRecoveryWithEmptyHistory(MiniDFSCluster dfs,
MiniMRCluster mr)
throws IOException {
mr.startTaskTracker(null, null, 1, 1);
FileSystem fileSys = dfs.getFileSystem();
cleanUp(fileSys, shareDir);
cleanUp(fileSys, inDir);
cleanUp(fileSys, outputDir);
JobConf conf = mr.createJobConf();
conf.setNumReduceTasks(0);
conf.setOutputCommitter(TestEmptyJob.CommitterWithDelayCleanup.class);
fileSys.delete(outputDir, false);
RunningJob job1 =
UtilsForTests.runJob(conf, inDir, outputDir, 30, 0);
conf.setNumReduceTasks(0);
conf.setOutputCommitter(CommitterWithDelaySetup.class);
Path inDir2 = new Path(testDir, "input2");
fileSys.mkdirs(inDir2);
Path outDir2 = new Path(testDir, "output2");
fileSys.delete(outDir2, false);
JobConf newConf = getJobs(mr.createJobConf(),
new JobPriority[] {JobPriority.NORMAL},
new int[] {10}, new int[] {0},
outDir2, inDir2,
getMapSignalFile(shareDir),
getReduceSignalFile(shareDir))[0];
JobClient jobClient = new JobClient(newConf);
RunningJob job2 = jobClient.submitJob(newConf);
JobID id = job2.getID();
/*RunningJob job2 =
UtilsForTests.runJob(mr.createJobConf(), inDir2, outDir2, 0);
JobID id = job2.getID();*/
JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
mr.getJobTrackerRunner().getJobTracker().initJob(jip);
// find out the history filename
String history =
JobHistory.JobInfo.getJobHistoryFileName(jip.getJobConf(), id);
Path historyPath = JobHistory.JobInfo.getJobHistoryLogLocation(history);
// get the conf file name
String parts[] = history.split("_");
// jobtracker-hostname_jobtracker-identifier_conf.xml
String jobUniqueString = parts[0] + "_" + parts[1] + "_" + id;
Path confPath = new Path(historyPath.getParent(), jobUniqueString + "_conf.xml");
// make sure that setup is launched
while (jip.runningMaps() == 0) {
UtilsForTests.waitFor(100);
}
id = job1.getID();
jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
mr.getJobTrackerRunner().getJobTracker().initJob(jip);
// make sure that cleanup is launched and is waiting
while (!jip.isCleanupLaunched()) {
UtilsForTests.waitFor(100);
}
mr.stopJobTracker();
// delete the history file .. just to be safe.
FileSystem historyFS = historyPath.getFileSystem(conf);
historyFS.delete(historyPath, false);
historyFS.create(historyPath).close(); // create an empty file
UtilsForTests.signalTasks(dfs, fileSys, getMapSignalFile(shareDir), getReduceSignalFile(shareDir), (short)1);
// Turn on the recovery
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
true);
mr.startJobTracker();
job1.waitForCompletion();
job2.waitForCompletion();
// check if the old files are deleted
assertFalse("Old jobhistory file is not deleted", historyFS.exists(historyPath));
assertFalse("Old jobconf file is not deleted", historyFS.exists(confPath));
}
public void testJobTrackerRestart() throws IOException {
String namenode = null;
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
try {
Configuration conf = new Configuration();
conf.setBoolean("dfs.replication.considerLoad", false);
dfs = new MiniDFSCluster(conf, 1, true, null, null);
dfs.waitActive();
fileSys = dfs.getFileSystem();
// clean up
fileSys.delete(testDir, true);
if (!fileSys.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
// Write the input file
UtilsForTests.writeFile(dfs.getNameNode(), conf,
new Path(inDir + "/file"), (short)1);
dfs.startDataNodes(conf, 1, true, null, null, null, null);
dfs.waitActive();
namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
+ (dfs.getFileSystem()).getUri().getPort();
// Make sure that jobhistory leads to a proper job restart
// So keep the blocksize and the buffer size small
JobConf jtConf = new JobConf();
jtConf.set("mapred.jobtracker.job.history.block.size", "1024");
jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000);
jtConf.setBoolean(JobConf.MR_ACLS_ENABLED, true);
// get the user group info
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
jtConf.set(QueueManager.toFullPropertyName("default",
QueueACL.SUBMIT_JOB.getAclName()), ugi.getUserName());
mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
// Test the tasktracker SYNC
testTaskEventsAndReportsWithRecovery(dfs, mr);
// Test jobtracker with restart-recovery turned off
testRestartWithoutRecovery(dfs, mr);
// test recovery with empty file
testJobRecoveryWithEmptyHistory(dfs, mr);
} finally {
if (mr != null) {
try {
mr.shutdown();
} catch (Exception e) {}
}
if (dfs != null) {
try {
dfs.shutdown();
} catch (Exception e) {}
}
}
}
private static String getMapSignalFile(Path dir) {
return (new Path(dir, "jt-restart-map-signal")).toString();
}
private static String getReduceSignalFile(Path dir) {
return (new Path(dir, "jt-restart-reduce-signal")).toString();
}
public static void main(String[] args) throws IOException {
new TestJobTrackerRestart().testJobTrackerRestart();
}
}