blob: 33283259a92b99bb7968be3997bbd14f239793d1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package testshell;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.jar.Attributes;
import java.util.jar.JarInputStream;
import java.util.jar.Manifest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* will be in an external jar and used for
* test in TestJobShell.java.
*/
public class ExternalMapReduce extends Configured implements Tool {
public void configure(JobConf job) {
// do nothing
}
public void close()
throws IOException {
}
// Executes the given shell command. Returns zero on success.
private static int execCommandAndCatchEx(String []argv) throws IOException {
Process p = Runtime.getRuntime().exec(argv);
int ret = -1;
try {
ret = p.waitFor();
} catch(InterruptedException ie) {
//do nothing here.
}
return ret;
}
// Verifies that the given list of files exist on the local file system.
// Throws on failure.
private static void verifyFilesExist(String [] fileList) throws IOException {
// fork off ls to see if the file exists.
// java file.exists() does not work on symlinks on java6
if (Shell.WINDOWS) {
// FIXME: Checking file existence one by one until multiple file
// scenario is supported by winutils
for (int i = 0; i< fileList.length; ++i) {
String[] argv = new String[3];
argv[0] = Shell.WINUTILS;
argv[1] = "ls";
argv[2] = fileList[i];
int ret = execCommandAndCatchEx(argv);
if (ret != 0) {
throw new IOException(fileList[i] + " does not exist");
}
}
} else {
String[] argv = new String[fileList.length + 1];
argv[0] = "ls";
for (int i = 0; i < fileList.length; ++i) {
argv[i + 1] = fileList[i];
}
int ret = execCommandAndCatchEx(argv);
if (ret != 0) {
throw new IOException("files_tmp does not exist");
}
}
}
// Check if the given item exists in classpath
private static boolean itemInClasspath(String elementName)
throws FileNotFoundException, IOException {
boolean found = false;
String classpath = System.getProperty("java.class.path");
// Check if the element exists in the classpath
if (classpath.indexOf(elementName) >= 0) {
found = true;
} else {
// In case of Windows, classpath is embedded in a referencing jar
if (Shell.WINDOWS) {
String[] cpFiles = classpath.split(File.pathSeparator);
if (cpFiles != null && cpFiles.length == 1 &&
cpFiles[0].endsWith(".jar")) {
// Search for the element in the jar manifest
found = itemInJarClasspath(elementName, cpFiles[0]);
}
}
}
return found;
}
// Check if the given item exists as a classpath element in a jar file
private static boolean itemInJarClasspath(String elementName,
String jarFileName)
throws FileNotFoundException, IOException{
// Load the Jar manifest
JarInputStream jarStream = new JarInputStream(
new FileInputStream(jarFileName));
Manifest jarManifest = jarStream.getManifest();
String classpath = jarManifest.getMainAttributes().getValue(
Attributes.Name.CLASS_PATH.toString());
// Check for the element in the classpath list
return (classpath.indexOf(elementName) >= 0);
}
public static class MapClass extends MapReduceBase
implements Mapper<WritableComparable, Writable,
WritableComparable, IntWritable> {
public void map(WritableComparable key, Writable value,
OutputCollector<WritableComparable, IntWritable> output,
Reporter reporter)
throws IOException {
//check for classpath
String classpath = System.getProperty("java.class.path");
if (!itemInClasspath("testjob.jar")) {
throw new IOException("failed to find in the library " + classpath);
}
if (!itemInClasspath("test.jar")) {
throw new IOException("failed to find the library test.jar in"
+ classpath);
}
String[] expectedFileList = { "files_tmp", "localfilelink",
"dfsfilelink", "tarlink", "ziplink", "test.tgz", "jarlink" };
verifyFilesExist(expectedFileList);
File file = new File("./jarlink/test.txt");
if (!FileUtil.canExecute(file)) {
throw new IOException("jarlink/test.txt is not executable");
}
}
}
public static class Reduce extends MapReduceBase
implements Reducer<WritableComparable, Writable,
WritableComparable, IntWritable> {
public void reduce(WritableComparable key, Iterator<Writable> values,
OutputCollector<WritableComparable, IntWritable> output,
Reporter reporter)
throws IOException {
//do nothing
}
}
public int run(String[] argv) throws IOException {
if (argv.length < 2) {
System.out.println("ExternalMapReduce <input> <output>");
return -1;
}
Path outDir = new Path(argv[1]);
Path input = new Path(argv[0]);
JobConf testConf = new JobConf(getConf(), ExternalMapReduce.class);
//try to load a class from libjar
try {
testConf.getClassByName("testjar.ClassWordCount");
} catch (ClassNotFoundException e) {
System.out.println("Could not find class from libjar");
return -1;
}
testConf.setJobName("external job");
FileInputFormat.setInputPaths(testConf, input);
FileOutputFormat.setOutputPath(testConf, outDir);
testConf.setMapperClass(MapClass.class);
testConf.setReducerClass(Reduce.class);
testConf.setNumReduceTasks(1);
JobClient.runJob(testConf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(),
new ExternalMapReduce(), args);
System.exit(res);
}
}