blob: f306964ad8c8245a7de901478b94d0377e281152 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.util.ArrayList;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
/**
* Test whether the JobInProgressListeners are informed as expected.
*/
public class TestJobInProgressListener extends TestCase {
private static final Log LOG =
LogFactory.getLog(TestJobInProgressListener.class);
private static String TEST_ROOT_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
private final Path testDir =
new Path(TEST_ROOT_DIR, "test-jip-listener-update");
private static MiniMRCluster mr;
private static JobTracker jobtracker;
private static JobConf conf;
private static MyScheduler myScheduler;
public static Test suite() {
TestSetup setup =
new TestSetup(new TestSuite(TestJobInProgressListener.class)) {
@Override
protected void setUp() throws Exception {
conf = new JobConf();
conf.setClass(JTConfig.JT_TASK_SCHEDULER, MyScheduler.class,
TaskScheduler.class);
mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
jobtracker = mr.getJobTrackerRunner().getJobTracker();
myScheduler = (MyScheduler)jobtracker.getScheduler();
conf = mr.createJobConf();
}
@Override
protected void tearDown() throws Exception {
conf = null;
try {
mr.shutdown();
} catch (Exception e) {
LOG.info("Error in shutting down the MR cluster", e);
}
jobtracker = null;
myScheduler.terminate();
}
};
return setup;
}
/**
* This test case tests if external updates to JIP do not result into
* undesirable effects
* Test is as follows
* - submit 2 jobs of normal priority. job1 is a waiting job which waits and
* blocks the cluster
* - change one parameter of job2 such that the job bumps up in the queue
* - check if the queue looks ok
*
*/
public void testJobQueueChanges() throws IOException {
LOG.info("Testing job queue changes");
// stop the job initializer
myScheduler.stopInitializer();
JobQueueJobInProgressListener myListener =
new JobQueueJobInProgressListener();
// add the listener
jobtracker.addJobInProgressListener(myListener);
Path inDir = new Path(testDir, "input");
Path outputDir1 = new Path(testDir, "output1");
Path outputDir2 = new Path(testDir, "output2");
RunningJob rJob1 =
UtilsForTests.runJob(conf, inDir, outputDir1, 1, 0);
LOG.info("Running job " + rJob1.getID().toString());
RunningJob rJob2 =
UtilsForTests.runJob(conf, inDir, outputDir2, 1, 0);
LOG.info("Running job " + rJob2.getID().toString());
// I. Check job-priority change
LOG.info("Testing job priority changes");
// bump up job2's priority
LOG.info("Increasing job2's priority to HIGH");
rJob2.setJobPriority("HIGH");
// check if the queue is sane
assertTrue("Priority change garbles the queue",
myListener.getJobQueue().size() == 2);
JobInProgress[] queue =
myListener.getJobQueue().toArray(new JobInProgress[0]);
// check if the bump has happened
assertTrue("Priority change failed to bump up job2 in the queue",
queue[0].getJobID().equals(rJob2.getID()));
assertTrue("Priority change failed to bump down job1 in the queue",
queue[1].getJobID().equals(rJob1.getID()));
assertEquals("Priority change has garbled the queue",
2, queue.length);
// II. Check start-time change
LOG.info("Testing job start-time changes");
// reset the priority which will make the order as
// - job1
// - job2
// this will help in bumping job2 on start-time change
LOG.info("Increasing job2's priority to NORMAL");
rJob2.setJobPriority("NORMAL");
// create the change event
JobInProgress jip2 = jobtracker.getJob(rJob2.getID());
JobInProgress jip1 = jobtracker.getJob(rJob1.getID());
JobStatus prevStatus = (JobStatus)jip2.getStatus().clone();
// change job2's start-time and the status
jip2.startTime = jip1.startTime - 1;
jip2.status.setStartTime(jip2.startTime);
JobStatus newStatus = (JobStatus)jip2.getStatus().clone();
// inform the listener
LOG.info("Updating the listener about job2's start-time change");
JobStatusChangeEvent event =
new JobStatusChangeEvent(jip2, EventType.START_TIME_CHANGED,
prevStatus, newStatus);
myListener.jobUpdated(event);
// check if the queue is sane
assertTrue("Start time change garbles the queue",
myListener.getJobQueue().size() == 2);
queue = myListener.getJobQueue().toArray(new JobInProgress[0]);
// check if the bump has happened
assertTrue("Start time change failed to bump up job2 in the queue",
queue[0].getJobID().equals(rJob2.getID()));
assertTrue("Start time change failed to bump down job1 in the queue",
queue[1].getJobID().equals(rJob1.getID()));
assertEquals("Start time change has garbled the queue",
2, queue.length);
}
/**
* Check the queue status upon
* - failed job
* - killed job
* - successful job
*/
public void testJobCompletion() throws Exception {
MyListener mainListener = new MyListener();
jobtracker.addJobInProgressListener(mainListener);
// stop the job initializer
myScheduler.stopInitializer();
// check queued jobs
testQueuedJobKill(conf, mainListener);
myScheduler.startInitializer();
// check the queue state for job states
testFailedJob(conf, mainListener);
testKilledJob(conf, mainListener);
testSuccessfulJob(conf, mainListener);
}
// A listener that inits the tasks one at a time and also listens to the
// events
public static class MyListener extends JobInProgressListener {
private List<JobInProgress> wjobs = new ArrayList<JobInProgress>();
private List<JobInProgress> rjobs = new ArrayList<JobInProgress>();
// list of job added to the wait queue
private List<JobID> wjobsAdded = new ArrayList<JobID>();
// list of job added to the running queue
private List<JobID> rjobsAdded = new ArrayList<JobID>();
public boolean contains (JobID id) {
return contains(id, true) || contains(id, false);
}
public boolean contains (JobID id, boolean waiting) {
if (!wjobsAdded.contains(id)) {
throw new RuntimeException("Job " + id + " not seen in waiting queue");
}
if (!waiting) {
if (!rjobsAdded.contains(id)) {
throw new RuntimeException("Job " + id + " not seen in run queue");
}
}
List<JobInProgress> queue = waiting ? wjobs : rjobs;
for (JobInProgress job : queue) {
if (job.getJobID().equals(id)) {
return true;
}
}
return false;
}
public void jobAdded(JobInProgress job) {
LOG.info("Job " + job.getJobID().toString() + " added");
wjobs.add(job);
wjobsAdded.add(job.getJobID());
}
public void jobRemoved(JobInProgress job) {
LOG.info("Job " + job.getJobID().toString() + " removed");
wjobs.remove(job);
rjobs.remove(job);
}
public void jobUpdated(JobChangeEvent event) {
LOG.info("Job " + event.getJobInProgress().getJobID().toString() + " updated");
// remove the job is the event is for a completed job
if (event instanceof JobStatusChangeEvent) {
JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event;
if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) {
// check if the state changes from
// RUNNING->COMPLETE(SUCCESS/KILLED/FAILED)
JobInProgress jip = event.getJobInProgress();
String jobId = jip.getJobID().toString();
if (jip.isComplete()) {
LOG.info("Job " + jobId + " deleted from the running queue");
if (statusEvent.getOldStatus().getRunState() == JobStatus.PREP) {
wjobs.remove(jip);
} else {
rjobs.remove(jip);
}
} else {
// PREP->RUNNING
LOG.info("Job " + jobId + " deleted from the waiting queue");
wjobs.remove(jip);
rjobs.add(jip);
rjobsAdded.add(jip.getJobID());
}
}
}
}
}
private void testFailedJob(JobConf job, MyListener myListener)
throws IOException {
LOG.info("Testing job-fail");
Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/input");
Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/output");
job.setNumMapTasks(1);
job.setNumReduceTasks(0);
job.setMaxMapAttempts(1);
// submit a job that fails
RunningJob rJob = UtilsForTests.runJobFail(job, inDir, outDir);
JobID id = rJob.getID();
// check if the job failure was notified
assertFalse("Missing event notification on failing a running job",
myListener.contains(id));
// check if failed
assertEquals("Job failed!", JobStatus.FAILED, rJob.getJobState());
}
private void testKilledJob(JobConf job, MyListener myListener)
throws IOException {
LOG.info("Testing job-kill");
Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/input");
Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/output");
job.setNumMapTasks(1);
job.setNumReduceTasks(0);
// submit and kill the job
RunningJob rJob = UtilsForTests.runJobKill(job, inDir, outDir);
JobID id = rJob.getID();
// check if the job failure was notified
assertFalse("Missing event notification on killing a running job",
myListener.contains(id));
// check if killed
assertEquals("Job failed!", JobStatus.KILLED, rJob.getJobState());
}
private void testSuccessfulJob(JobConf job, MyListener myListener)
throws Exception {
LOG.info("Testing job-success");
Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input");
Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output");
job.setNumMapTasks(1);
job.setNumReduceTasks(0);
// submit the job
RunningJob rJob = UtilsForTests.runJobSucceed(job, inDir, outDir);
// wait for the job to be successful
rJob.waitForCompletion();
// check if the job success was notified
assertFalse("Missing event notification for a successful job",
myListener.contains(rJob.getID()));
// check if successful
assertEquals("Job failed!", JobStatus.SUCCEEDED, rJob.getJobState());
// test if 0-task jobs with setup-cleanup works fine
LOG.info("Testing job with no task job with setup and cleanup");
job.setNumMapTasks(0);
job.setNumReduceTasks(0);
outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output-no-tasks");
// submit the job
rJob = UtilsForTests.runJobSucceed(job, inDir, outDir);
// wait for the job to be successful
rJob.waitForCompletion();
// check if the job success was notified
assertFalse("Missing event notification for a successful job with no tasks",
myListener.contains(rJob.getID(), true));
// check if successful
assertEquals("Job failed!", JobStatus.SUCCEEDED, rJob.getJobState());
// test if jobs with no tasks (0 maps, 0 red) update the listener properly
LOG.info("Testing job with no-set-cleanup no task");
outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output-no-tasks-no-set");
Job j = MapReduceTestUtil.createJob(mr.createJobConf(), inDir, outDir, 0, 0);
j.setJobSetupCleanupNeeded(false);
j.setOutputFormatClass(TestNoJobSetupCleanup.MyOutputFormat.class);
j.submit();
j.waitForCompletion(true);
JobID id = (org.apache.hadoop.mapred.JobID)j.getID();
// check if the job is in the waiting queue
assertFalse("Missing event notification on no-set-cleanup no task job",
myListener.contains(id, true));
// check if the job is successful
assertEquals("Job status doesnt reflect success",
JobStatus.SUCCEEDED, rJob.getJobState());
}
/**
* This scheduler never schedules any task as it doesnt init any task. So all
* the jobs are queued forever.
*/
public static class MyScheduler extends JobQueueTaskScheduler {
@Override
public synchronized void start() throws IOException {
super.start();
}
void stopInitializer() throws IOException {
// Remove the eager task initializer
taskTrackerManager.removeJobInProgressListener(
eagerTaskInitializationListener);
// terminate it
eagerTaskInitializationListener.terminate();
}
void startInitializer() throws IOException {
eagerTaskInitializationListener =
new EagerTaskInitializationListener(getConf());
eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
// start it
eagerTaskInitializationListener.start();
// add the eager task initializer
taskTrackerManager.addJobInProgressListener(
eagerTaskInitializationListener);
}
}
private void testQueuedJobKill(JobConf conf, MyListener myListener)
throws IOException {
LOG.info("Testing queued-job-kill");
Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerqueuedjob/input");
Path outDir = new Path(TEST_ROOT_DIR + "/jiplistener1ueuedjob/output");
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(0);
RunningJob rJob = UtilsForTests.runJob(conf, inDir, outDir);
JobID id = rJob.getID();
LOG.info("Job : " + id.toString() + " submitted");
// check if the job is in the waiting queue
assertTrue("Missing event notification on submiting a job",
myListener.contains(id, true));
// kill the job
LOG.info("Killing job : " + id.toString());
rJob.killJob();
// check if the job is killed
assertEquals("Job status doesnt reflect the kill-job action",
JobStatus.KILLED, rJob.getJobState());
// check if the job is correctly moved
// from the waiting list
assertFalse("Missing event notification on killing a waiting job",
myListener.contains(id, true));
}
}