blob: af65b2221e2f7d05a5bc146c08de7a9f62fac498 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import junit.framework.TestCase;
import java.io.*;
import java.util.HashSet;
import java.util.Set;
import org.junit.*;
/**
* This test checks jobtracker in safe mode. In safe mode the jobtracker upon
* restart doesnt schedule any new tasks and waits for the (old) trackers to
* join back.
*/
/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
*/
@Ignore
public class TestJobTrackerSafeMode extends TestCase {
final Path testDir =
new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
final Path inDir = new Path(testDir, "input");
final Path shareDir = new Path(testDir, "share");
final Path outputDir = new Path(testDir, "output");
final int numDir = 1;
final int numTrackers = 2;
private static final Log LOG =
LogFactory.getLog(TestJobTrackerSafeMode.class);
private JobConf configureJob(JobConf conf, int maps, int reduces,
String mapSignal, String redSignal)
throws IOException {
UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir,
maps, reduces, "test-jobtracker-safemode",
mapSignal, redSignal);
return conf;
}
/**
* Tests the jobtracker's safemode. The test is as follows :
* - starts a cluster with 2 trackers
* - submits a job with large (40) maps to make sure that all the trackers
* are logged to the job history
* - wait for the job to be 50% done
* - stop the jobtracker
* - wait for the trackers to be done with all the tasks
* - kill a task tracker
* - start the jobtracker
* - start 2 more trackers
* - now check that while all the tracker are detected (or lost) the
* scheduling window is closed
* - check that after all the trackers are recovered, scheduling is opened
*/
private void testSafeMode(MiniDFSCluster dfs, MiniMRCluster mr)
throws IOException, InterruptedException {
FileSystem fileSys = dfs.getFileSystem();
JobConf jobConf = mr.createJobConf();
String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
int numTracker = jobtracker.getClusterStatus(false).getTaskTrackers();
// Configure the jobs
JobConf job = configureJob(jobConf, 40, 0, mapSignalFile, redSignalFile);
fileSys.delete(shareDir, true);
// Submit a master job
JobClient jobClient = new JobClient(job);
RunningJob rJob = jobClient.submitJob(job);
JobID id = rJob.getID();
// wait for the job to be inited
mr.initializeJob(id);
// Make sure that the master job is 50% completed
while (UtilsForTests.getJobStatus(jobClient, id).mapProgress()
< 0.5f) {
LOG.info("Waiting for the job to be 50% done");
UtilsForTests.waitFor(100);
}
// Kill the jobtracker
mr.stopJobTracker();
// Enable recovery on restart
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
true);
// Signal the maps to complete
UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
// Signal the reducers to complete
UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile,
redSignalFile);
// wait for the tasks to complete at the tracker
Set<String> trackers = new HashSet<String>();
for (int i = 0 ; i < numTracker; ++i) {
TaskTracker t = mr.getTaskTrackerRunner(i).getTaskTracker();
trackers.add(t.getName());
int runningCount = t.getRunningTaskStatuses().size();
while (runningCount != 0) {
LOG.info("Waiting for tracker " + t.getName() + " to stabilize");
UtilsForTests.waitFor(100);
runningCount = 0;
for (TaskStatus status : t.getRunningTaskStatuses()) {
if (status.getIsMap()
&& (status.getRunState() == TaskStatus.State.UNASSIGNED
|| status.getRunState() == TaskStatus.State.RUNNING)) {
++runningCount;
}
}
}
}
LOG.info("Trackers have stabilized");
// Kill a tasktracker
int trackerToKill = --numTracker;
TaskTracker t = mr.getTaskTrackerRunner(trackerToKill).getTaskTracker();
trackers.remove(t.getName()); // remove this from the set to check
Set<String> lostTrackers = new HashSet<String>();
lostTrackers.add(t.getName());
// get the attempt-id's to ignore
// stop the tracker
LOG.info("Stopping tracker : " + t.getName());
mr.getTaskTrackerRunner(trackerToKill).getTaskTracker().shutdown();
mr.stopTaskTracker(trackerToKill);
// Restart the jobtracker
mr.startJobTracker();
// Wait for the JT to be ready
UtilsForTests.waitForJobTracker(jobClient);
jobtracker = mr.getJobTrackerRunner().getJobTracker();
// Start a tracker
LOG.info("Start a new tracker");
mr.startTaskTracker(null, null, ++numTracker, numDir);
// Start a tracker
LOG.info("Start a new tracker");
mr.startTaskTracker(null, null, ++numTracker, numDir);
// Check if the jobs are still running
// Wait for the tracker to be lost
boolean shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
while (!checkTrackers(jobtracker, trackers, lostTrackers)) {
assertFalse("JobTracker has opened up scheduling before all the"
+ " trackers were recovered", shouldSchedule);
UtilsForTests.waitFor(100);
// snapshot jobtracker's scheduling status
shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
}
assertTrue("JobTracker hasnt opened up scheduling even all the"
+ " trackers were recovered",
jobtracker.recoveryManager.shouldSchedule());
assertEquals("Recovery manager is in inconsistent state",
0, jobtracker.recoveryManager.recoveredTrackers.size());
// wait for the job to be complete
UtilsForTests.waitTillDone(jobClient);
}
private boolean checkTrackers(JobTracker jobtracker, Set<String> present,
Set<String> absent) {
long jobtrackerRecoveryFinishTime =
jobtracker.getStartTime() + jobtracker.getRecoveryDuration();
for (String trackerName : present) {
TaskTrackerStatus status = jobtracker.getTaskTrackerStatus(trackerName);
// check if the status is present and also the tracker has contacted back
// after restart
if (status == null
|| status.getLastSeen() < jobtrackerRecoveryFinishTime) {
return false;
}
}
for (String trackerName : absent) {
TaskTrackerStatus status = jobtracker.getTaskTrackerStatus(trackerName);
// check if the status is still present
if ( status != null) {
return false;
}
}
return true;
}
/**
* Test {@link JobTracker}'s safe mode.
* @throws InterruptedException
*/
public void testJobTrackerSafeMode() throws IOException,InterruptedException{
String namenode = null;
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
try {
Configuration conf = new Configuration();
conf.setBoolean("dfs.replication.considerLoad", false);
dfs = new MiniDFSCluster(conf, 1, true, null, null);
dfs.waitActive();
fileSys = dfs.getFileSystem();
// clean up
fileSys.delete(testDir, true);
if (!fileSys.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
// Write the input file
UtilsForTests.writeFile(dfs.getNameNode(), conf,
new Path(inDir + "/file"), (short)1);
dfs.startDataNodes(conf, 1, true, null, null, null, null);
dfs.waitActive();
namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
+ (dfs.getFileSystem()).getUri().getPort();
// Make sure that jobhistory leads to a proper job restart
// So keep the blocksize and the buffer size small
JobConf jtConf = new JobConf();
jtConf.set("mapred.jobtracker.job.history.block.size", "512");
jtConf.set("mapred.jobtracker.job.history.buffer.size", "512");
jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
jtConf.setLong("mapred.tasktracker.expiry.interval", 5000);
jtConf.setInt("mapred.reduce.copy.backoff", 4);
jtConf.setLong("mapred.job.reuse.jvm.num.tasks", -1);
mr = new MiniMRCluster(numTrackers, namenode, numDir, null, null, jtConf);
// Test Lost tracker case
testSafeMode(dfs, mr);
} finally {
if (mr != null) {
try {
mr.shutdown();
} catch (Exception e) {}
}
if (dfs != null) {
try {
dfs.shutdown();
} catch (Exception e) {}
}
}
}
public static void main(String[] args)
throws IOException, InterruptedException {
new TestJobTrackerSafeMode().testJobTrackerSafeMode();
}
}