| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.sqoop.mapreduce; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.sql.SQLException; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapreduce.InputFormat; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.Mapper; |
| import org.apache.hadoop.mapreduce.OutputFormat; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.sqoop.config.ConfigurationConstants; |
| import org.apache.sqoop.SqoopOptions; |
| import org.apache.sqoop.config.ConfigurationHelper; |
| import org.apache.sqoop.manager.ConnManager; |
| import org.apache.sqoop.tool.SqoopTool; |
| import org.apache.sqoop.util.ClassLoaderStack; |
| import org.apache.sqoop.util.Jars; |
| import org.apache.sqoop.validation.*; |
| |
| /** |
| * Base class for configuring and running a MapReduce job. |
| * Allows dependency injection, etc, for easy customization of import job types. |
| */ |
| public class JobBase { |
| |
| public static final Log LOG = LogFactory.getLog(JobBase.class.getName()); |
| |
| public static final String SERIALIZE_SQOOPOPTIONS = "sqoop.jobbase.serialize.sqoopoptions"; |
| public static final boolean SERIALIZE_SQOOPOPTIONS_DEFAULT = false; |
| public static final String HADOOP_MAP_TASK_MAX_ATTEMTPS = |
| "mapreduce.map.maxattempts"; |
| public static final String HADOOP_REDUCE_TASK_MAX_ATTEMTPS = |
| "mapreduce.reduce.maxattempts"; |
| |
| protected SqoopOptions options; |
| protected Class<? extends Mapper> mapperClass; |
| protected Class<? extends InputFormat> inputFormatClass; |
| protected Class<? extends OutputFormat> outputFormatClass; |
| |
| private Job mrJob; |
| |
| private ClassLoader prevClassLoader = null; |
| protected final boolean isHCatJob; |
| |
| public static final String PROPERTY_VERBOSE = "sqoop.verbose"; |
| |
| public JobBase() { |
| this(null); |
| } |
| |
| public JobBase(final SqoopOptions opts) { |
| this(opts, null, null, null); |
| } |
| |
| public JobBase(final SqoopOptions opts, |
| final Class<? extends Mapper> mapperClass, |
| final Class<? extends InputFormat> inputFormatClass, |
| final Class<? extends OutputFormat> outputFormatClass) { |
| |
| this.options = opts; |
| this.mapperClass = mapperClass; |
| this.inputFormatClass = inputFormatClass; |
| this.outputFormatClass = outputFormatClass; |
| isHCatJob = options.getHCatTableName() != null; |
| } |
| |
| /** |
| * @return the mapper class to use for the job. |
| */ |
| protected Class<? extends Mapper> getMapperClass() |
| throws ClassNotFoundException { |
| return this.mapperClass; |
| } |
| |
| /** |
| * @return the inputformat class to use for the job. |
| */ |
| protected Class<? extends InputFormat> getInputFormatClass() |
| throws ClassNotFoundException { |
| return this.inputFormatClass; |
| } |
| |
| /** |
| * @return the outputformat class to use for the job. |
| */ |
| protected Class<? extends OutputFormat> getOutputFormatClass() |
| throws ClassNotFoundException { |
| return this.outputFormatClass; |
| } |
| |
| /** Set the OutputFormat class to use for this job. */ |
| public void setOutputFormatClass(Class<? extends OutputFormat> cls) { |
| this.outputFormatClass = cls; |
| } |
| |
| /** Set the InputFormat class to use for this job. */ |
| public void setInputFormatClass(Class<? extends InputFormat> cls) { |
| this.inputFormatClass = cls; |
| } |
| |
| /** Set the Mapper class to use for this job. */ |
| public void setMapperClass(Class<? extends Mapper> cls) { |
| this.mapperClass = cls; |
| } |
| |
| /** |
| * Set the SqoopOptions configuring this job. |
| */ |
| public void setOptions(SqoopOptions opts) { |
| this.options = opts; |
| } |
| |
| /** |
| * Put jar files required by Sqoop into the DistributedCache. |
| * @param job the Job being submitted. |
| * @param mgr the ConnManager to use. |
| */ |
| protected void cacheJars(Job job, ConnManager mgr) |
| throws IOException { |
| if (options.isSkipDistCache()) { |
| LOG.info("Not adding sqoop jars to distributed cache as requested"); |
| return; |
| } |
| |
| Configuration conf = job.getConfiguration(); |
| FileSystem fs = FileSystem.getLocal(conf); |
| Set<String> localUrls = new HashSet<String>(); |
| |
| addToCache(Jars.getSqoopJarPath(), fs, localUrls); |
| if (null != mgr) { |
| addToCache(Jars.getDriverClassJar(mgr), fs, localUrls); |
| addToCache(Jars.getJarPathForClass(mgr.getClass()), fs, localUrls); |
| } |
| |
| SqoopTool tool = this.options.getActiveSqoopTool(); |
| if (null != tool) { |
| // Make sure the jar for the tool itself is on the classpath. (In case |
| // this is a third-party plugin tool.) |
| addToCache(Jars.getJarPathForClass(tool.getClass()), fs, localUrls); |
| List<String> toolDeps = tool.getDependencyJars(); |
| if (null != toolDeps) { |
| for (String depFile : toolDeps) { |
| addToCache(depFile, fs, localUrls); |
| } |
| } |
| } |
| |
| // If the user specified a particular jar file name, |
| |
| // Add anything in $SQOOP_HOME/lib, if this is set. |
| String sqoopHome = System.getenv("SQOOP_HOME"); |
| if (null != sqoopHome) { |
| File sqoopHomeFile = new File(sqoopHome); |
| File sqoopLibFile = new File(sqoopHomeFile, "lib"); |
| if (sqoopLibFile.exists()) { |
| addDirToCache(sqoopLibFile, fs, localUrls); |
| } |
| } else { |
| LOG.warn("SQOOP_HOME is unset. May not be able to find " |
| + "all job dependencies."); |
| } |
| |
| // If the user run import into hive as Parquet file, |
| // Add anything in $HIVE_HOME/lib. |
| if (options.doHiveImport() && (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile)) { |
| String hiveHome = options.getHiveHome(); |
| if (null != hiveHome) { |
| File hiveHomeFile = new File(hiveHome); |
| File hiveLibFile = new File(hiveHomeFile, "lib"); |
| if (hiveLibFile.exists()) { |
| addDirToCache(hiveLibFile, fs, localUrls); |
| } |
| } else { |
| LOG.warn("HIVE_HOME is unset. Cannot add hive libs as dependencies."); |
| } |
| } |
| |
| String tmpjars = conf.get(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM); |
| StringBuilder sb = new StringBuilder(); |
| |
| // If we didn't put anything in our set, then there's nothing to cache. |
| if (localUrls.isEmpty() && (org.apache.commons.lang.StringUtils.isEmpty(tmpjars))) { |
| return; |
| } |
| |
| if (null != tmpjars) { |
| String[] tmpjarsElements = tmpjars.split(","); |
| for (String jarElement : tmpjarsElements) { |
| if (jarElement.isEmpty()) { |
| warn("Empty input is invalid and was removed from tmpjars."); |
| } else { |
| sb.append(jarElement); |
| sb.append(","); |
| } |
| } |
| } |
| |
| int lastComma = sb.lastIndexOf(","); |
| if (localUrls.isEmpty() && lastComma >= 0) { |
| sb.deleteCharAt(lastComma); |
| } |
| |
| // Add these to the 'tmpjars' array, which the MR JobSubmitter |
| // will upload to HDFS and put in the DistributedCache libjars. |
| sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0]))); |
| conf.set(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM, sb.toString()); |
| } |
| |
| protected void warn(String message) { |
| LOG.warn(message); |
| } |
| |
| private void addToCache(String file, FileSystem fs, Set<String> localUrls) { |
| if (null == file) { |
| return; |
| } |
| |
| Path p = new Path(file); |
| String qualified = p.makeQualified(fs).toString(); |
| LOG.debug("Adding to job classpath: " + qualified); |
| localUrls.add(qualified); |
| } |
| |
| /** |
| * Add the .jar elements of a directory to the DCache classpath, |
| * nonrecursively. |
| */ |
| private void addDirToCache(File dir, FileSystem fs, Set<String> localUrls) { |
| if (null == dir) { |
| return; |
| } |
| |
| for (File libfile : dir.listFiles()) { |
| if (libfile.exists() && !libfile.isDirectory() |
| && libfile.getName().endsWith("jar")) { |
| addToCache(libfile.toString(), fs, localUrls); |
| } |
| } |
| } |
| |
| /** |
| * If jars must be loaded into the local environment, do so here. |
| */ |
| protected void loadJars(Configuration conf, String ormJarFile, |
| String tableClassName) throws IOException { |
| |
| if (ConfigurationHelper.isLocalJobTracker(conf)) { |
| // If we're using the LocalJobRunner, then instead of using the compiled |
| // jar file as the job source, we're running in the current thread. Push |
| // on another classloader that loads from that jar in addition to |
| // everything currently on the classpath. |
| this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile, |
| tableClassName); |
| } |
| } |
| |
| /** |
| * If any classloader was invoked by loadJars, free it here. |
| */ |
| protected void unloadJars() { |
| if (null != this.prevClassLoader) { |
| // unload the special classloader for this jar. |
| ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader); |
| } |
| } |
| |
| /** |
| * Configure the inputformat to use for the job. |
| */ |
| protected void configureInputFormat(Job job, String tableName, |
| String tableClassName, String splitByCol) |
| throws ClassNotFoundException, IOException { |
| //TODO: 'splitByCol' is import-job specific; lift it out of this API. |
| Class<? extends InputFormat> ifClass = getInputFormatClass(); |
| LOG.debug("Using InputFormat: " + ifClass); |
| job.setInputFormatClass(ifClass); |
| } |
| |
| /** |
| * Configure the output format to use for the job. |
| */ |
| protected void configureOutputFormat(Job job, String tableName, |
| String tableClassName) throws ClassNotFoundException, IOException { |
| Class<? extends OutputFormat> ofClass = getOutputFormatClass(); |
| LOG.debug("Using OutputFormat: " + ofClass); |
| job.setOutputFormatClass(ofClass); |
| } |
| |
| /** |
| * Set the mapper class implementation to use in the job, |
| * as well as any related configuration (e.g., map output types). |
| */ |
| protected void configureMapper(Job job, String tableName, |
| String tableClassName) throws ClassNotFoundException, IOException { |
| job.setMapperClass(getMapperClass()); |
| } |
| |
| /** |
| * Configure the number of map/reduce tasks to use in the job, |
| * returning the number of map tasks for backward compatibility. |
| */ |
| protected int configureNumTasks(Job job) throws IOException { |
| int numMapTasks = configureNumMapTasks(job); |
| configureNumReduceTasks(job); |
| return numMapTasks; |
| } |
| |
| /** |
| * Configure the number of map tasks to use in the job. |
| */ |
| protected int configureNumMapTasks(Job job) throws IOException { |
| int numMapTasks = options.getNumMappers(); |
| if (numMapTasks < 1) { |
| numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS; |
| LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers."); |
| } |
| ConfigurationHelper.setJobNumMaps(job, numMapTasks); |
| return numMapTasks; |
| } |
| |
| /** |
| * Configure the number of reduce tasks to use in the job. |
| */ |
| protected int configureNumReduceTasks(Job job) throws IOException { |
| job.setNumReduceTasks(0); |
| return 0; |
| } |
| |
| /** Set the main job that will be run. */ |
| protected void setJob(Job job) { |
| mrJob = job; |
| } |
| |
| /** |
| * @return the main MapReduce job that is being run, or null if no |
| * job has started. |
| */ |
| public Job getJob() { |
| return mrJob; |
| } |
| |
| /** |
| * Create new Job object in unified way for all types of jobs. |
| * |
| * @param configuration Hadoop configuration that should be used |
| * @return New job object, created object won't be persisted in the instance |
| */ |
| public Job createJob(Configuration configuration) throws IOException { |
| // Put the SqoopOptions into job if requested |
| if(configuration.getBoolean(SERIALIZE_SQOOPOPTIONS, SERIALIZE_SQOOPOPTIONS_DEFAULT)) { |
| putSqoopOptionsToConfiguration(options, configuration); |
| } |
| |
| return new Job(configuration); |
| } |
| |
| /** |
| * Iterates over serialized form of SqoopOptions and put them into Configuration |
| * object. |
| * |
| * @param opts SqoopOptions that should be serialized |
| * @param configuration Target configuration object |
| */ |
| public void putSqoopOptionsToConfiguration(SqoopOptions opts, Configuration configuration) { |
| for(Map.Entry<Object, Object> e : opts.writeProperties().entrySet()) { |
| String key = (String)e.getKey(); |
| String value = (String)e.getValue(); |
| |
| // We don't need to do if(value is empty) because that is already done |
| // for us by the SqoopOptions.writeProperties() method. |
| configuration.set("sqoop.opt." + key, value); |
| } |
| } |
| |
| /** |
| * Actually run the MapReduce job. |
| */ |
| protected boolean runJob(Job job) throws ClassNotFoundException, IOException, |
| InterruptedException { |
| return job.waitForCompletion(true); |
| } |
| |
| /** |
| * Display a notice on the log that the current MapReduce job has |
| * been retired, and thus Counters are unavailable. |
| * @param log the Log to display the info to. |
| */ |
| protected void displayRetiredJobNotice(Log log) { |
| log.info("The MapReduce job has already been retired. Performance"); |
| log.info("counters are unavailable. To get this information, "); |
| log.info("you will need to enable the completed job store on "); |
| log.info("the jobtracker with:"); |
| log.info("mapreduce.jobtracker.persist.jobstatus.active = true"); |
| log.info("mapreduce.jobtracker.persist.jobstatus.hours = 1"); |
| log.info("A jobtracker restart is required for these settings"); |
| log.info("to take effect."); |
| } |
| |
| /** |
| * Save interesting options to constructed job. Goal here is to propagate some |
| * of them to the job itself, so that they can be easily accessed. We're |
| * propagating only interesting global options (like verbose flag). |
| * |
| * @param job Destination job to save options |
| */ |
| protected void propagateOptionsToJob(Job job) { |
| Configuration configuration = job.getConfiguration(); |
| |
| // So far, propagate only verbose flag |
| configuration.setBoolean(PROPERTY_VERBOSE, options.getVerbose()); |
| } |
| |
| protected long getRowCountFromDB(ConnManager connManager, String tableName) |
| throws SQLException { |
| return connManager.getTableRowCount(tableName); |
| } |
| |
| protected long getRowCountFromHadoop(Job job) |
| throws IOException, InterruptedException { |
| return ConfigurationHelper.getNumMapOutputRecords(job); |
| } |
| |
| protected void doValidate(SqoopOptions options, Configuration conf, |
| ValidationContext validationContext) |
| throws ValidationException { |
| Validator validator = (Validator) ReflectionUtils.newInstance( |
| options.getValidatorClass(), conf); |
| ValidationThreshold threshold = (ValidationThreshold) |
| ReflectionUtils.newInstance(options.getValidationThresholdClass(), |
| conf); |
| ValidationFailureHandler failureHandler = (ValidationFailureHandler) |
| ReflectionUtils.newInstance(options.getValidationFailureHandlerClass(), |
| conf); |
| |
| StringBuilder sb = new StringBuilder(); |
| sb.append("Validating the integrity of the import using the " |
| + "following configuration\n"); |
| sb.append("\tValidator : ").append(validator.getClass().getName()) |
| .append('\n'); |
| sb.append("\tThreshold Specifier : ") |
| .append(threshold.getClass().getName()).append('\n'); |
| sb.append("\tFailure Handler : ") |
| .append(failureHandler.getClass().getName()).append('\n'); |
| LOG.info(sb.toString()); |
| validator.validate(validationContext, threshold, failureHandler); |
| } |
| |
| } |