blob: ad26a67cb1279c64eb791b73f2db1596113e0dc1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.util;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.URL;
import java.net.URLDecoder;
import java.util.Enumeration;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Provides functions for working with Hadoop's distributed cache. These
* include:
* <ul>
* <li>
* Functions for working with a job-specific distributed cache of objects, like
* the serialized runtime nodes in a MapReduce.</li>
* <li>
* Functions for adding library jars to the distributed cache, which will be
* added to the classpath of MapReduce tasks.</li>
* </ul>
*/
public class DistCache {
/**
* Configuration key for setting the replication factor for files distributed using the Crunch
* DistCache helper class. This can be used to scale read access for files used by the Crunch
* framework.
*/
public static final String DIST_CACHE_REPLICATION = "crunch.distcache.replication";
// Configuration key holding the paths of jars to export to the distributed
// cache.
private static final String TMPJARS_KEY = "tmpjars";
public static void write(Configuration conf, Path path, Object value) throws IOException {
FileSystem fs = path.getFileSystem(conf);
short replication = (short) conf.getInt(DIST_CACHE_REPLICATION, fs.getDefaultReplication(path));
ObjectOutputStream oos = new ObjectOutputStream(fs.create(path, replication));
oos.writeObject(value);
oos.close();
DistributedCache.addCacheFile(path.toUri(), conf);
}
public static Object read(Configuration conf, Path requestedFile) throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
Path cachedPath = null;
try {
cachedPath = getPathToCacheFile(requestedFile, conf);
} catch (CrunchRuntimeException cre) {
throw new IOException("Can not determine cached location for " + requestedFile.toString(), cre);
}
if(cachedPath == null || !localFs.exists(cachedPath)) {
throw new IOException("Expected file with path: " + requestedFile.toString() + " to be cached");
}
ObjectInputStream ois = null;
try {
ois = new ObjectInputStream(localFs.open(cachedPath));
return ois.readObject();
} catch (ClassNotFoundException e) {
throw new CrunchRuntimeException(e);
} finally {
if (ois != null) {
ois.close();
}
}
}
public static void addCacheFile(Path path, Configuration conf) {
DistributedCache.addCacheFile(path.toUri(), conf);
}
public static Path getPathToCacheFile(Path path, Configuration conf) {
try {
for (Path localPath : DistributedCache.getLocalCacheFiles(conf)) {
if (localPath.toString().endsWith(path.getName())) {
return localPath.makeQualified(FileSystem.getLocal(conf));
}
}
} catch (IOException e) {
throw new CrunchRuntimeException(e);
}
return null;
}
/**
* Adds the specified jar to the distributed cache of jobs using the provided
* configuration. The jar will be placed on the classpath of tasks run by the
* job.
*
* @param conf
* The configuration used to add the jar to the distributed cache.
* @param jarFile
* The jar file to add to the distributed cache.
* @throws IOException
* If the jar file does not exist or there is a problem accessing
* the file.
*/
public static void addJarToDistributedCache(Configuration conf, File jarFile) throws IOException {
if (!jarFile.exists()) {
throw new IOException("Jar file: " + jarFile.getCanonicalPath() + " does not exist.");
}
if (!jarFile.getName().endsWith(".jar")) {
throw new IllegalArgumentException("File: " + jarFile.getCanonicalPath() + " is not a .jar " + "file.");
}
// Get a qualified path for the jar.
FileSystem fileSystem = FileSystem.getLocal(conf);
Path jarPath = new Path(jarFile.getCanonicalPath());
String qualifiedPath = jarPath.makeQualified(fileSystem).toString();
// Add the jar to the configuration variable.
String jarConfiguration = conf.get(TMPJARS_KEY, "");
if (!jarConfiguration.isEmpty()) {
jarConfiguration += ",";
}
jarConfiguration += qualifiedPath;
conf.set(TMPJARS_KEY, jarConfiguration);
}
/**
* Adds the jar at the specified path to the distributed cache of jobs using
* the provided configuration. The jar will be placed on the classpath of
* tasks run by the job.
*
* @param conf
* The configuration used to add the jar to the distributed cache.
* @param jarFile
* The path to the jar file to add to the distributed cache.
* @throws IOException
* If the jar file does not exist or there is a problem accessing
* the file.
*/
public static void addJarToDistributedCache(Configuration conf, String jarFile) throws IOException {
addJarToDistributedCache(conf, new File(jarFile));
}
/**
* Finds the path to a jar that contains the class provided, if any. There is
* no guarantee that the jar returned will be the first on the classpath to
* contain the file. This method is basically lifted out of Hadoop's
* {@link org.apache.hadoop.mapred.JobConf} class.
*
* @param jarClass
* The class the jar file should contain.
* @return The path to a jar file that contains the class, or
* <code>null</code> if no such jar exists.
* @throws IOException
* If there is a problem searching for the jar file.
*/
public static String findContainingJar(Class<?> jarClass) throws IOException {
ClassLoader loader = jarClass.getClassLoader();
String classFile = jarClass.getName().replaceAll("\\.", "/") + ".class";
for (Enumeration<URL> itr = loader.getResources(classFile); itr.hasMoreElements();) {
URL url = itr.nextElement();
if ("jar".equals(url.getProtocol())) {
String toReturn = url.getPath();
if (toReturn.startsWith("file:")) {
toReturn = toReturn.substring("file:".length());
}
// URLDecoder is a misnamed class, since it actually decodes
// x-www-form-urlencoded MIME type rather than actual
// URL encoding (which the file path has). Therefore it would
// decode +s to ' 's which is incorrect (spaces are actually
// either unencoded or encoded as "%20"). Replace +s first, so
// that they are kept sacred during the decoding process.
toReturn = toReturn.replaceAll("\\+", "%2B");
toReturn = URLDecoder.decode(toReturn, "UTF-8");
return toReturn.replaceAll("!.*$", "");
}
}
return null;
}
/**
* Adds all jars under the specified directory to the distributed cache of
* jobs using the provided configuration. The jars will be placed on the
* classpath of tasks run by the job. This method does not descend into
* subdirectories when adding jars.
*
* @param conf
* The configuration used to add jars to the distributed cache.
* @param jarDirectory
* A directory containing jar files to add to the distributed cache.
* @throws IOException
* If the directory does not exist or there is a problem accessing
* the directory.
*/
public static void addJarDirToDistributedCache(Configuration conf, File jarDirectory) throws IOException {
if (!jarDirectory.exists() || !jarDirectory.isDirectory()) {
throw new IOException("Jar directory: " + jarDirectory.getCanonicalPath() + " does not "
+ "exist or is not a directory.");
}
for (File file : jarDirectory.listFiles()) {
if (!file.isDirectory() && file.getName().endsWith(".jar")) {
addJarToDistributedCache(conf, file);
}
}
}
/**
* Adds all jars under the directory at the specified path to the distributed
* cache of jobs using the provided configuration. The jars will be placed on
* the classpath of the tasks run by the job. This method does not descend
* into subdirectories when adding jars.
*
* @param conf
* The configuration used to add jars to the distributed cache.
* @param jarDirectory
* The path to a directory containing jar files to add to the
* distributed cache.
* @throws IOException
* If the directory does not exist or there is a problem accessing
* the directory.
*/
public static void addJarDirToDistributedCache(Configuration conf, String jarDirectory) throws IOException {
addJarDirToDistributedCache(conf, new File(jarDirectory));
}
}