blob: 747cd9f604055d141774c5094752ed4a0304623e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.tools.CLI;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
test CLI class. CLI class implemented the Tool interface.
Here test that CLI sends correct command with options and parameters.
*/
public class TestMRJobClient extends ClusterMapReduceTestCase {
private static final Log LOG = LogFactory.getLog(TestMRJobClient.class);
private Job runJob(Configuration conf) throws Exception {
String input = "hello1\nhello2\nhello3\n";
Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
1, 1, input);
job.setJobName("mr");
job.setPriority(JobPriority.NORMAL);
job.waitForCompletion(true);
return job;
}
private Job runJobInBackGround(Configuration conf) throws Exception {
String input = "hello1\nhello2\nhello3\n";
Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
1, 1, input);
job.setJobName("mr");
job.setPriority(JobPriority.NORMAL);
job.submit();
int i = 0;
while (i++ < 200 && job.getJobID() == null) {
LOG.info("waiting for jobId...");
Thread.sleep(100);
}
return job;
}
public static int runTool(Configuration conf, Tool tool, String[] args,
OutputStream out) throws Exception {
LOG.info("args = " + Arrays.toString(args));
PrintStream oldOut = System.out;
PrintStream newOut = new PrintStream(out, true);
try {
System.setOut(newOut);
return ToolRunner.run(conf, tool, args);
} finally {
System.setOut(oldOut);
}
}
private static class BadOutputFormat extends TextOutputFormat<Object, Object> {
@Override
public void checkOutputSpecs(JobContext job) throws IOException {
throw new IOException();
}
}
@Test
public void testJobSubmissionSpecsAndFiles() throws Exception {
Configuration conf = createJobConf();
Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
1, 1);
job.setOutputFormatClass(BadOutputFormat.class);
try {
job.submit();
fail("Should've thrown an exception while checking output specs.");
} catch (Exception e) {
assertTrue(e instanceof IOException);
}
Cluster cluster = new Cluster(conf);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,
job.getConfiguration());
Path submitJobDir = new Path(jobStagingArea, "JobId");
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
assertFalse("Shouldn't have created a job file if job specs failed.",
FileSystem.get(conf).exists(submitJobFile));
}
/**
* main test method
*/
@Test
public void testJobClient() throws Exception {
Configuration conf = createJobConf();
Job job = runJob(conf);
String jobId = job.getJobID().toString();
// test all jobs list
testAllJobList(jobId, conf);
// test only submitted jobs list
testSubmittedJobList(conf);
// test job counter
testGetCounter(jobId, conf);
// status
testJobStatus(jobId, conf);
// test list of events
testJobEvents(jobId, conf);
// test job history
testJobHistory(jobId, conf);
// test tracker list
testListTrackers(conf);
// attempts list
testListAttemptIds(jobId, conf);
// black list
testListBlackList(conf);
// test method main and help screen
startStop();
// test a change job priority .
testChangingJobPriority(jobId, conf);
// submit job from file
testSubmit(conf);
// kill a task
testKillTask(conf);
// fail a task
testfailTask(conf);
// kill job
testKillJob(conf);
// download job config
testConfig(jobId, conf);
}
/**
* test fail task
*/
private void testfailTask(Configuration conf) throws Exception {
Job job = runJobInBackGround(conf);
CLI jc = createJobClient();
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
TaskAttemptID taid = new TaskAttemptID(tid, 1);
ByteArrayOutputStream out = new ByteArrayOutputStream();
// TaskAttemptId is not set
int exitCode = runTool(conf, jc, new String[] { "-fail-task" }, out);
assertEquals("Exit code", -1, exitCode);
runTool(conf, jc, new String[] { "-fail-task", taid.toString() }, out);
String answer = new String(out.toByteArray(), "UTF-8");
assertTrue(answer.contains("Killed task " + taid + " by failing it"));
}
/**
* test a kill task
*/
private void testKillTask(Configuration conf) throws Exception {
Job job = runJobInBackGround(conf);
CLI jc = createJobClient();
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
TaskAttemptID taid = new TaskAttemptID(tid, 1);
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad parameters
int exitCode = runTool(conf, jc, new String[] { "-kill-task" }, out);
assertEquals("Exit code", -1, exitCode);
runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out);
String answer = new String(out.toByteArray(), "UTF-8");
assertTrue(answer.contains("Killed task " + taid));
}
/**
* test a kill job
*/
private void testKillJob(Configuration conf) throws Exception {
Job job = runJobInBackGround(conf);
String jobId = job.getJobID().toString();
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
// without jobId
int exitCode = runTool(conf, jc, new String[] { "-kill" }, out);
assertEquals("Exit code", -1, exitCode);
// good parameters
exitCode = runTool(conf, jc, new String[] { "-kill", jobId }, out);
assertEquals("Exit code", 0, exitCode);
String answer = new String(out.toByteArray(), "UTF-8");
assertTrue(answer.contains("Killed job " + jobId));
}
/**
* test submit task from file
*/
private void testSubmit(Configuration conf) throws Exception {
CLI jc = createJobClient();
Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
1, 1, "ping");
job.setJobName("mr");
job.setPriority(JobPriority.NORMAL);
File fcon = File.createTempFile("config", ".xml");
FileSystem localFs = FileSystem.getLocal(conf);
String fconUri = new Path(fcon.getAbsolutePath())
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toUri()
.toString();
job.getConfiguration().writeXml(new FileOutputStream(fcon));
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad parameters
int exitCode = runTool(conf, jc, new String[] { "-submit" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc,
new String[] { "-submit", fconUri }, out);
assertEquals("Exit code", 0, exitCode);
String answer = new String(out.toByteArray());
// in console was written
assertTrue(answer.contains("Created job "));
}
/**
* test start form console command without options
*/
private void startStop() {
ByteArrayOutputStream data = new ByteArrayOutputStream();
PrintStream error = System.err;
System.setErr(new PrintStream(data));
ExitUtil.disableSystemExit();
try {
CLI.main(new String[0]);
fail(" CLI.main should call System.exit");
} catch (ExitUtil.ExitException e) {
ExitUtil.resetFirstExitException();
assertEquals(-1, e.status);
} catch (Exception e) {
} finally {
System.setErr(error);
}
// in console should be written help text
String s = new String(data.toByteArray());
assertTrue(s.contains("-submit"));
assertTrue(s.contains("-status"));
assertTrue(s.contains("-kill"));
assertTrue(s.contains("-set-priority"));
assertTrue(s.contains("-events"));
assertTrue(s.contains("-history"));
assertTrue(s.contains("-list"));
assertTrue(s.contains("-list-active-trackers"));
assertTrue(s.contains("-list-blacklisted-trackers"));
assertTrue(s.contains("-list-attempt-ids"));
assertTrue(s.contains("-kill-task"));
assertTrue(s.contains("-fail-task"));
assertTrue(s.contains("-logs"));
}
/**
* black list
*/
private void testListBlackList(Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(conf, jc, new String[] {
"-list-blacklisted-trackers", "second in" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-list-blacklisted-trackers" },
out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
counter++;
}
assertEquals(0, counter);
}
/**
* print AttemptIds list
*/
private void testListAttemptIds(String jobId, Configuration conf)
throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(conf, jc, new String[] { "-list-attempt-ids" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-list-attempt-ids", jobId,
"MAP", "completed" }, out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
counter++;
}
assertEquals(1, counter);
}
/**
* print tracker list
*/
private void testListTrackers(Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(conf, jc, new String[] { "-list-active-trackers",
"second parameter" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-list-active-trackers" }, out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
counter++;
}
assertEquals(2, counter);
}
/**
* print job history from file
*/
private void testJobHistory(String jobId, Configuration conf)
throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
// Find jhist file
String historyFileUri = null;
RemoteIterator<LocatedFileStatus> it =
getFileSystem().listFiles(new Path("/"), true);
while (it.hasNext() && historyFileUri == null) {
LocatedFileStatus file = it.next();
if (file.getPath().getName().endsWith(".jhist")) {
historyFileUri = file.getPath().toUri().toString();
}
}
assertNotNull("Could not find jhist file", historyFileUri);
for (String historyFileOrJobId : new String[]{historyFileUri, jobId}) {
// Try a bunch of different valid combinations of the command
int exitCode = runTool(conf, jc, new String[]{
"-history",
"all",
historyFileOrJobId,
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryHumanOutput(jobId, out);
File outFile = File.createTempFile("myout", ".txt");
exitCode = runTool(conf, jc, new String[]{
"-history",
"all",
historyFileOrJobId,
"-outfile",
outFile.getAbsolutePath()
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryHumanFileOutput(jobId, out, outFile);
outFile = File.createTempFile("myout", ".txt");
exitCode = runTool(conf, jc, new String[]{
"-history",
"all",
historyFileOrJobId,
"-outfile",
outFile.getAbsolutePath(),
"-format",
"human"
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryHumanFileOutput(jobId, out, outFile);
exitCode = runTool(conf, jc, new String[]{
"-history",
historyFileOrJobId,
"-format",
"human"
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryHumanOutput(jobId, out);
exitCode = runTool(conf, jc, new String[]{
"-history",
"all",
historyFileOrJobId,
"-format",
"json"
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryJSONOutput(jobId, out);
outFile = File.createTempFile("myout", ".txt");
exitCode = runTool(conf, jc, new String[]{
"-history",
"all",
historyFileOrJobId,
"-outfile",
outFile.getAbsolutePath(),
"-format",
"json"
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryJSONFileOutput(jobId, out, outFile);
exitCode = runTool(conf, jc, new String[]{
"-history",
historyFileOrJobId,
"-format",
"json"
}, out);
assertEquals("Exit code", 0, exitCode);
checkHistoryJSONOutput(jobId, out);
// Check some bad arguments
exitCode = runTool(conf, jc, new String[]{
"-history",
historyFileOrJobId,
"foo"
}, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[]{
"-history",
historyFileOrJobId,
"-format"
}, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[]{
"-history",
historyFileOrJobId,
"-outfile",
}, out);
assertEquals("Exit code", -1, exitCode);
try {
runTool(conf, jc, new String[]{
"-history",
historyFileOrJobId,
"-format",
"foo"
}, out);
fail();
} catch (IllegalArgumentException e) {
// Expected
}
}
try {
runTool(conf, jc, new String[]{
"-history",
"not_a_valid_history_file_or_job_id",
}, out);
fail();
} catch (IllegalArgumentException e) {
// Expected
}
}
private void checkHistoryHumanOutput(String jobId, ByteArrayOutputStream out)
throws IOException, JSONException {
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
br.readLine();
String line = br.readLine();
br.close();
assertEquals("Hadoop job: " + jobId, line);
out.reset();
}
private void checkHistoryJSONOutput(String jobId, ByteArrayOutputStream out)
throws IOException, JSONException {
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
String line = org.apache.commons.io.IOUtils.toString(br);
br.close();
JSONObject json = new JSONObject(line);
assertEquals(jobId, json.getString("hadoopJob"));
out.reset();
}
private void checkHistoryHumanFileOutput(String jobId,
ByteArrayOutputStream out, File outFile)
throws IOException, JSONException {
BufferedReader br = new BufferedReader(new FileReader(outFile));
br.readLine();
String line = br.readLine();
br.close();
assertEquals("Hadoop job: " + jobId, line);
assertEquals(0, out.size());
}
private void checkHistoryJSONFileOutput(String jobId,
ByteArrayOutputStream out, File outFile)
throws IOException, JSONException {
BufferedReader br = new BufferedReader(new FileReader(outFile));
String line = org.apache.commons.io.IOUtils.toString(br);
br.close();
JSONObject json = new JSONObject(line);
assertEquals(jobId, json.getString("hadoopJob"));
assertEquals(0, out.size());
}
/**
* download job config
*/
private void testConfig(String jobId, Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad arguments
int exitCode = runTool(conf, jc, new String[] { "-config" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-config job_invalid foo.xml" },
out);
assertEquals("Exit code", -1, exitCode);
// good arguments
File outFile = File.createTempFile("config", ".xml");
exitCode = runTool(conf, jc, new String[] { "-config", jobId,
outFile.toString()}, out);
assertEquals("Exit code", 0, exitCode);
BufferedReader br = new BufferedReader(new FileReader(outFile));
String line = br.readLine();
br.close();
assertEquals("<?xml version=\"1.0\" encoding=\"UTF-8\" " +
"standalone=\"no\"?><configuration>", line);
}
/**
* print job events list
*/
private void testJobEvents(String jobId, Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(conf, jc, new String[] { "-events" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-events", jobId, "0", "100" },
out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
String attemptId = ("attempt" + jobId.substring(3));
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.contains(attemptId)) {
counter++;
}
}
assertEquals(2, counter);
}
/**
* print job status
*/
private void testJobStatus(String jobId, Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad options
int exitCode = runTool(conf, jc, new String[] { "-status" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-status", jobId }, out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (!line.contains("Job state:")) {
continue;
}
break;
}
assertNotNull(line);
assertTrue(line.contains("SUCCEEDED"));
}
/**
* print counters
*/
public void testGetCounter(String jobId, Configuration conf) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad command
int exitCode = runTool(conf, createJobClient(),
new String[] { "-counter", }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, createJobClient(),
new String[] { "-counter", jobId,
"org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" },
out);
assertEquals("Exit code", 0, exitCode);
assertEquals("Counter", "3", out.toString().trim());
}
/**
* print a job list
*/
protected void testAllJobList(String jobId, Configuration conf)
throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad options
int exitCode = runTool(conf, createJobClient(), new String[] { "-list",
"alldata" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, createJobClient(),
// all jobs
new String[] { "-list", "all" }, out);
assertEquals("Exit code", 0, exitCode);
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
String line;
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.contains(jobId)) {
counter++;
}
}
assertEquals(1, counter);
out.reset();
}
protected void testSubmittedJobList(Configuration conf) throws Exception {
Job job = runJobInBackGround(conf);
ByteArrayOutputStream out = new ByteArrayOutputStream();
String line;
int counter = 0;
// only submitted
int exitCode =
runTool(conf, createJobClient(), new String[] { "-list" }, out);
assertEquals("Exit code", 0, exitCode);
BufferedReader br =
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
out.toByteArray())));
counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.contains(job.getJobID().toString())) {
counter++;
}
}
// all jobs submitted! no current
assertEquals(1, counter);
}
protected void verifyJobPriority(String jobId, String priority,
Configuration conf, CLI jc) throws Exception {
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis);
int exitCode = runTool(conf, jc, new String[] { "-list", "all" }, pos);
assertEquals("Exit code", 0, exitCode);
BufferedReader br = new BufferedReader(new InputStreamReader(pis));
String line;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (!line.contains(jobId)) {
continue;
}
assertTrue(line.contains(priority));
break;
}
pis.close();
}
public void testChangingJobPriority(String jobId, Configuration conf)
throws Exception {
int exitCode = runTool(conf, createJobClient(),
new String[] { "-set-priority" }, new ByteArrayOutputStream());
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, createJobClient(), new String[] { "-set-priority",
jobId, "VERY_LOW" }, new ByteArrayOutputStream());
assertEquals("Exit code", 0, exitCode);
// set-priority is fired after job is completed in YARN, hence need not
// have to update the priority.
verifyJobPriority(jobId, "DEFAULT", conf, createJobClient());
}
protected CLI createJobClient() throws IOException {
return new CLI();
}
}