/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.sqoop.mapreduce;

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.sqoop.config.ConfigurationConstants;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.tool.SqoopTool;
import com.cloudera.sqoop.util.ClassLoaderStack;
import com.cloudera.sqoop.util.Jars;

/**
 * Base class for configuring and running a MapReduce job.
 * Allows dependency injection, etc, for easy customization of import job types.
 */
public class JobBase {

  public static final Log LOG = LogFactory.getLog(JobBase.class.getName());

  public static final String SERIALIZE_SQOOPOPTIONS = "sqoop.jobbase.serialize.sqoopoptions";
  public static final boolean SERIALIZE_SQOOPOPTIONS_DEFAULT = false;
  public static final String HADOOP_MAP_TASK_MAX_ATTEMTPS =
      "mapreduce.map.maxattempts";
  public static final String HADOOP_REDUCE_TASK_MAX_ATTEMTPS =
      "mapreduce.reduce.maxattempts";

  protected SqoopOptions options;
  protected Class<? extends Mapper> mapperClass;
  protected Class<? extends InputFormat> inputFormatClass;
  protected Class<? extends OutputFormat> outputFormatClass;

  private Job mrJob;

  private ClassLoader prevClassLoader = null;
  protected final boolean isHCatJob;

  public static final String PROPERTY_VERBOSE = "sqoop.verbose";

  public JobBase() {
    this(null);
  }

  public JobBase(final SqoopOptions opts) {
    this(opts, null, null, null);
  }

  public JobBase(final SqoopOptions opts,
      final Class<? extends Mapper> mapperClass,
      final Class<? extends InputFormat> inputFormatClass,
      final Class<? extends OutputFormat> outputFormatClass) {

    this.options = opts;
    this.mapperClass = mapperClass;
    this.inputFormatClass = inputFormatClass;
    this.outputFormatClass = outputFormatClass;
    isHCatJob = options.getHCatTableName() != null;
  }

  /**
   * @return the mapper class to use for the job.
   */
  protected Class<? extends Mapper> getMapperClass()
      throws ClassNotFoundException {
    return this.mapperClass;
  }

  /**
   * @return the inputformat class to use for the job.
   */
  protected Class<? extends InputFormat> getInputFormatClass()
      throws ClassNotFoundException {
    return this.inputFormatClass;
  }

  /**
   * @return the outputformat class to use for the job.
   */
  protected Class<? extends OutputFormat> getOutputFormatClass()
      throws ClassNotFoundException {
    return this.outputFormatClass;
  }

  /** Set the OutputFormat class to use for this job. */
  public void setOutputFormatClass(Class<? extends OutputFormat> cls) {
    this.outputFormatClass = cls;
  }

  /** Set the InputFormat class to use for this job. */
  public void setInputFormatClass(Class<? extends InputFormat> cls) {
    this.inputFormatClass = cls;
  }

  /** Set the Mapper class to use for this job. */
  public void setMapperClass(Class<? extends Mapper> cls) {
    this.mapperClass = cls;
  }

  /**
   * Set the SqoopOptions configuring this job.
   */
  public void setOptions(SqoopOptions opts) {
    this.options = opts;
  }

  /**
   * Put jar files required by Sqoop into the DistributedCache.
   * @param job the Job being submitted.
   * @param mgr the ConnManager to use.
   */
  protected void cacheJars(Job job, ConnManager mgr)
      throws IOException {
    if (options.isSkipDistCache()) {
      LOG.info("Not adding sqoop jars to distributed cache as requested");
      return;
    }

    Configuration conf = job.getConfiguration();
    FileSystem fs = FileSystem.getLocal(conf);
    Set<String> localUrls = new HashSet<String>();

    addToCache(Jars.getSqoopJarPath(), fs, localUrls);
    if (null != mgr) {
      addToCache(Jars.getDriverClassJar(mgr), fs, localUrls);
      addToCache(Jars.getJarPathForClass(mgr.getClass()), fs, localUrls);
    }

    SqoopTool tool = this.options.getActiveSqoopTool();
    if (null != tool) {
      // Make sure the jar for the tool itself is on the classpath. (In case
      // this is a third-party plugin tool.)
      addToCache(Jars.getJarPathForClass(tool.getClass()), fs, localUrls);
      List<String> toolDeps = tool.getDependencyJars();
      if (null != toolDeps) {
        for (String depFile : toolDeps) {
          addToCache(depFile, fs, localUrls);
        }
      }
    }

    // If the user specified a particular jar file name,

    // Add anything in $SQOOP_HOME/lib, if this is set.
    String sqoopHome = System.getenv("SQOOP_HOME");
    if (null != sqoopHome) {
      File sqoopHomeFile = new File(sqoopHome);
      File sqoopLibFile = new File(sqoopHomeFile, "lib");
      if (sqoopLibFile.exists()) {
        addDirToCache(sqoopLibFile, fs, localUrls);
      }
    } else {
      LOG.warn("SQOOP_HOME is unset. May not be able to find "
          + "all job dependencies.");
    }

    // If the user run import into hive as Parquet file,
    // Add anything in $HIVE_HOME/lib.
    if (options.doHiveImport() && (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile)) {
      String hiveHome = options.getHiveHome();
      if (null != hiveHome) {
        File hiveHomeFile = new File(hiveHome);
        File hiveLibFile = new File(hiveHomeFile, "lib");
        if (hiveLibFile.exists()) {
          addDirToCache(hiveLibFile, fs, localUrls);
        }
      } else {
        LOG.warn("HIVE_HOME is unset. Cannot add hive libs as dependencies.");
      }
    }

    String tmpjars = conf.get(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM);
    StringBuilder sb = new StringBuilder();

    // If we didn't put anything in our set, then there's nothing to cache.
    if (localUrls.isEmpty() && (org.apache.commons.lang.StringUtils.isEmpty(tmpjars))) {
      return;
    }

    if (null != tmpjars) {
      String[] tmpjarsElements = tmpjars.split(",");
      for (String jarElement : tmpjarsElements) {
        if (jarElement.isEmpty()) {
          warn("Empty input is invalid and was removed from tmpjars.");
        } else {
          sb.append(jarElement);
          sb.append(",");
        }
      }
    }

    int lastComma = sb.lastIndexOf(",");
    if (localUrls.isEmpty() && lastComma >= 0) {
      sb.deleteCharAt(lastComma);
    }

    // Add these to the 'tmpjars' array, which the MR JobSubmitter
    // will upload to HDFS and put in the DistributedCache libjars.
    sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
    conf.set(ConfigurationConstants.MAPRED_DISTCACHE_CONF_PARAM, sb.toString());
  }

  protected void warn(String message) {
    LOG.warn(message);
  }

  private void addToCache(String file, FileSystem fs, Set<String> localUrls) {
    if (null == file) {
      return;
    }

    Path p = new Path(file);
    String qualified = p.makeQualified(fs).toString();
    LOG.debug("Adding to job classpath: " + qualified);
    localUrls.add(qualified);
  }

  /**
   * Add the .jar elements of a directory to the DCache classpath,
   * nonrecursively.
   */
  private void addDirToCache(File dir, FileSystem fs, Set<String> localUrls) {
    if (null == dir) {
      return;
    }

    for (File libfile : dir.listFiles()) {
      if (libfile.exists() && !libfile.isDirectory()
          && libfile.getName().endsWith("jar")) {
        addToCache(libfile.toString(), fs, localUrls);
      }
    }
  }

  /**
   * If jars must be loaded into the local environment, do so here.
   */
  protected void loadJars(Configuration conf, String ormJarFile,
      String tableClassName) throws IOException {

    boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
        || "local".equals(conf.get("mapred.job.tracker"));
    if (isLocal) {
      // If we're using the LocalJobRunner, then instead of using the compiled
      // jar file as the job source, we're running in the current thread. Push
      // on another classloader that loads from that jar in addition to
      // everything currently on the classpath.
      this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile,
          tableClassName);
    }
  }

  /**
   * If any classloader was invoked by loadJars, free it here.
   */
  protected void unloadJars() {
    if (null != this.prevClassLoader) {
      // unload the special classloader for this jar.
      ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader);
    }
  }

  /**
   * Configure the inputformat to use for the job.
   */
  protected void configureInputFormat(Job job, String tableName,
      String tableClassName, String splitByCol)
      throws ClassNotFoundException, IOException {
    //TODO: 'splitByCol' is import-job specific; lift it out of this API.
    Class<? extends InputFormat> ifClass = getInputFormatClass();
    LOG.debug("Using InputFormat: " + ifClass);
    job.setInputFormatClass(ifClass);
  }

  /**
   * Configure the output format to use for the job.
   */
  protected void configureOutputFormat(Job job, String tableName,
      String tableClassName) throws ClassNotFoundException, IOException {
    Class<? extends OutputFormat> ofClass = getOutputFormatClass();
    LOG.debug("Using OutputFormat: " + ofClass);
    job.setOutputFormatClass(ofClass);
  }

  /**
   * Set the mapper class implementation to use in the job,
   * as well as any related configuration (e.g., map output types).
   */
  protected void configureMapper(Job job, String tableName,
      String tableClassName) throws ClassNotFoundException, IOException {
    job.setMapperClass(getMapperClass());
  }

  /**
   * Configure the number of map/reduce tasks to use in the job,
   * returning the number of map tasks for backward compatibility.
   */
  protected int configureNumTasks(Job job) throws IOException {
    int numMapTasks = configureNumMapTasks(job);
    configureNumReduceTasks(job);
    return numMapTasks;
  }

  /**
   * Configure the number of map tasks to use in the job.
   */
  protected int configureNumMapTasks(Job job) throws IOException {
    int numMapTasks = options.getNumMappers();
    if (numMapTasks < 1) {
      numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
      LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
    }
    ConfigurationHelper.setJobNumMaps(job, numMapTasks);
    return numMapTasks;
  }

  /**
   * Configure the number of reduce tasks to use in the job.
   */
  protected int configureNumReduceTasks(Job job) throws IOException {
    job.setNumReduceTasks(0);
    return 0;
  }

  /** Set the main job that will be run. */
  protected void setJob(Job job) {
    mrJob = job;
  }

  /**
   * @return the main MapReduce job that is being run, or null if no
   * job has started.
   */
  public Job getJob() {
    return mrJob;
  }

  /**
   * Create new Job object in unified way for all types of jobs.
   *
   * @param configuration Hadoop configuration that should be used
   * @return New job object, created object won't be persisted in the instance
   */
  public Job createJob(Configuration configuration) throws IOException {
    // Put the SqoopOptions into job if requested
    if(configuration.getBoolean(SERIALIZE_SQOOPOPTIONS, SERIALIZE_SQOOPOPTIONS_DEFAULT)) {
      putSqoopOptionsToConfiguration(options, configuration);
    }

    return new Job(configuration);
  }

  /**
   * Iterates over serialized form of SqoopOptions and put them into Configuration
   * object.
   *
   * @param opts SqoopOptions that should be serialized
   * @param configuration Target configuration object
   */
  public void putSqoopOptionsToConfiguration(SqoopOptions opts, Configuration configuration) {
    for(Map.Entry<Object, Object> e : opts.writeProperties().entrySet()) {
      String key = (String)e.getKey();
      String value = (String)e.getValue();

      // We don't need to do if(value is empty) because that is already done
      // for us by the SqoopOptions.writeProperties() method.
      configuration.set("sqoop.opt." + key, value);
    }
  }

  /**
   * Actually run the MapReduce job.
   */
  protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
      InterruptedException {
    return job.waitForCompletion(true);
  }

  /**
   * Display a notice on the log that the current MapReduce job has
   * been retired, and thus Counters are unavailable.
   * @param log the Log to display the info to.
   */
  protected void displayRetiredJobNotice(Log log) {
    log.info("The MapReduce job has already been retired. Performance");
    log.info("counters are unavailable. To get this information, ");
    log.info("you will need to enable the completed job store on ");
    log.info("the jobtracker with:");
    log.info("mapreduce.jobtracker.persist.jobstatus.active = true");
    log.info("mapreduce.jobtracker.persist.jobstatus.hours = 1");
    log.info("A jobtracker restart is required for these settings");
    log.info("to take effect.");
  }

  /**
   * Save interesting options to constructed job. Goal here is to propagate some
   * of them to the job itself, so that they can be easily accessed. We're
   * propagating only interesting global options (like verbose flag).
   *
   * @param job Destination job to save options
   */
  protected void propagateOptionsToJob(Job job) {
    Configuration configuration = job.getConfiguration();

    // So far, propagate only verbose flag
    configuration.setBoolean(PROPERTY_VERBOSE, options.getVerbose());
  }
}
