blob: e6d605cbc0760fad98ae157bb54097c65d84b6f4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.accumulo;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.config.ConfigurationConstants;
import org.apache.sqoop.config.ConfigurationHelper;
/**
* This class provides a method that checks if Accumulo jars are present in the
* current classpath. It also provides a setAlwaysNoAccumuloJarMode mechanism
* for testing and simulation the condition where the is on Accumulo jar (since
* Accumulo is pulled automatically by ivy)
*/
public final class AccumuloUtil {
private static final Log LOG = LogFactory.getLog(AccumuloUtil.class);
private static boolean testingMode = false;
private static final String INSTANCE_CLASS
= "org.apache.accumulo.core.client.Instance";
// Prevent instantiation
private AccumuloUtil() {
}
/**
* This is a way to make this always return false for testing.
*/
public static void setAlwaysNoAccumuloJarMode(boolean mode) {
testingMode = mode;
}
public static boolean isAccumuloJarPresent() {
if (testingMode) {
return false;
}
try {
Class.forName(INSTANCE_CLASS);
} catch (ClassNotFoundException cnfe) {
return false;
}
return true;
}
/**
* Add the Accumulo jar files to local classpath and dist cache.
* @throws IOException
*/
public static void addJars(Job job, SqoopOptions options) throws IOException {
if (ConfigurationHelper.isLocalJobTracker(job.getConfiguration())) {
LOG.info("Not adding Accumulo jars to distributed cache in local mode");
} else if (options.isSkipDistCache()) {
LOG.info("Not adding Accumulo jars to distributed cache as requested");
} else {
Configuration conf = job.getConfiguration();
String accumuloHome = null;
String zookeeperHome = null;
FileSystem fs = FileSystem.getLocal(conf);
if (options != null) {
accumuloHome = options.getAccumuloHome();
}
if (accumuloHome == null) {
accumuloHome = SqoopOptions.getAccumuloHomeDefault();
}
LOG.info("Accumulo job : Accumulo Home = " + accumuloHome);
if (options != null) {
zookeeperHome = options.getZookeeperHome();
}
if (zookeeperHome == null) {
zookeeperHome = SqoopOptions.getZookeeperHomeDefault();
}
LOG.info("Accumulo job : Zookeeper Home = " + zookeeperHome);
conf.addResource(accumuloHome + AccumuloConstants.ACCUMULO_SITE_XML_PATH);
// Add any libjars already specified
Set<String> localUrls = new HashSet<String>();
localUrls
.addAll(conf.getStringCollection(
ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM));
if (null == accumuloHome) {
throw new IllegalArgumentException("ACCUMULO_HOME is not set.");
} else {
File dir = new File(accumuloHome, "lib");
String path = dir.getPath();
LOG.info("Adding jar files under " + path + " to distributed cache");
addDirToCache(dir, fs, localUrls, false);
}
if (null == zookeeperHome) {
throw new IllegalArgumentException("ZOOKEEPER_HOME is not set.");
} else {
String dir = zookeeperHome;
LOG.info("Adding jar files under " + dir + " to distributed cache");
addDirToCache(new File(dir), fs, localUrls, false);
}
String tmpjars = conf
.get(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM);
StringBuilder sb = new StringBuilder(1024);
if (null != tmpjars) {
sb.append(tmpjars);
sb.append(",");
}
sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
conf.set(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM,
sb.toString());
}
}
/**
* Add the .jar elements of a directory to the DCache classpath, optionally
* recursively.
*/
private static void addDirToCache(File dir, FileSystem fs,
Set<String> localUrls, boolean recursive) {
if (dir != null) {
File[] fileList = dir.listFiles();
if (fileList != null) {
for (File libFile : dir.listFiles()) {
if (libFile.exists() && !libFile.isDirectory()
&& libFile.getName().endsWith("jar")) {
Path p = new Path(libFile.toString());
if (libFile.canRead()) {
String qualified = p.makeQualified(fs).toString();
LOG.info("Adding to job classpath: " + qualified);
localUrls.add(qualified);
} else {
LOG.warn("Ignoring unreadable file " + libFile);
}
}
if (recursive && libFile.isDirectory()) {
addDirToCache(libFile, fs, localUrls, recursive);
}
}
} else {
LOG.warn("No files under " + dir
+ " to add to distributed cache for Accumulo job");
}
}
}
}