blob: 2ff43cbe0552f703f166c18d5dcfecceb0c12460 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
import java.io.Writer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class TestJobClient extends ClusterMapReduceTestCase {
private static final Log LOG = LogFactory.getLog(TestJobClient.class);
private String runJob() throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("hello1\n");
wr.write("hello2\n");
wr.write("hello3\n");
wr.close();
JobConf conf = createJobConf();
conf.setJobName("mr");
conf.setJobPriority(JobPriority.HIGH);
conf.setInputFormat(TextInputFormat.class);
conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
conf.setReducerClass(org.apache.hadoop.mapred.lib.IdentityReducer.class);
FileInputFormat.setInputPaths(conf, getInputDir());
FileOutputFormat.setOutputPath(conf, getOutputDir());
return JobClient.runJob(conf).getID().toString();
}
private int runTool(Configuration conf, Tool tool, String[] args, OutputStream out) throws Exception {
PrintStream oldOut = System.out;
PrintStream newOut = new PrintStream(out, true);
try {
System.setOut(newOut);
return ToolRunner.run(conf, tool, args);
} finally {
System.setOut(oldOut);
}
}
public void testGetCounter() throws Exception {
String jobId = runJob();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(createJobConf(), new JobClient(),
new String[] { "-counter", jobId,
"org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS" },
out);
assertEquals("Exit code", 0, exitCode);
assertEquals("Counter", "3", out.toString().trim());
}
public void testJobList() throws Exception {
String jobId = runJob();
verifyJobPriority(jobId, "HIGH");
}
private void verifyJobPriority(String jobId, String priority)
throws Exception {
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis);
int exitCode = runTool(createJobConf(), new JobClient(),
new String[] { "-list", "all" },
pos);
assertEquals("Exit code", 0, exitCode);
BufferedReader br = new BufferedReader(new InputStreamReader(pis));
String line = null;
while ((line=br.readLine()) != null) {
LOG.info("line = " + line);
if (!line.startsWith(jobId)) {
continue;
}
assertTrue(line.contains(priority));
break;
}
pis.close();
}
public void testChangingJobPriority() throws Exception {
String jobId = runJob();
int exitCode = runTool(createJobConf(), new JobClient(),
new String[] { "-set-priority", jobId, "VERY_LOW" },
new ByteArrayOutputStream());
assertEquals("Exit code", 0, exitCode);
verifyJobPriority(jobId, "VERY_LOW");
}
}