blob: dbe89349c17ca86c3985c279cb3fe150e3bcab0e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.tool;
import static org.apache.sqoop.manager.JdbcDrivers.DB2;
import static org.apache.sqoop.manager.JdbcDrivers.HSQLDB;
import static org.apache.sqoop.manager.JdbcDrivers.MYSQL;
import static org.apache.sqoop.manager.JdbcDrivers.ORACLE;
import static org.apache.sqoop.manager.JdbcDrivers.POSTGRES;
import static org.apache.sqoop.manager.JdbcDrivers.SQLSERVER;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
import com.cloudera.sqoop.cli.ToolOptions;
import com.cloudera.sqoop.metastore.GenericJobStorage;
import com.cloudera.sqoop.metastore.JobData;
import com.cloudera.sqoop.metastore.JobStorage;
import com.cloudera.sqoop.metastore.JobStorageFactory;
import org.apache.sqoop.manager.JdbcDrivers;
import org.apache.sqoop.util.LoggingUtils;
/**
* Tool that creates and executes saved jobs.
*/
public class JobTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
public static final Log LOG = LogFactory.getLog(
JobTool.class.getName());
private static final String DASH_STR = "--";
private static Set<JdbcDrivers> SUPPORTED_DRIVERS = EnumSet.of(HSQLDB, MYSQL, ORACLE, POSTGRES, DB2, SQLSERVER);
private enum JobOp {
JobCreate,
JobDelete,
JobExecute,
JobList,
JobShow,
};
private Map<String, String> storageDescriptor;
private String jobName;
private JobOp operation;
private JobStorage storage;
public JobTool() {
super("job");
}
/**
* Given an array of strings, return all elements of this
* array up to (but not including) the first instance of "--".
*/
private String [] getElementsUpToDoubleDash(String [] array) {
String [] parseableChildArgv = null;
for (int i = 0; i < array.length; i++) {
if ("--".equals(array[i])) {
parseableChildArgv = Arrays.copyOfRange(array, 0, i);
break;
}
}
if (parseableChildArgv == null) {
// Didn't find any nested '--'.
parseableChildArgv = array;
}
return parseableChildArgv;
}
/**
* Given an array of strings, return the first instance
* of "--" and all following elements.
* If no "--" exists, return null.
*/
private String [] getElementsAfterDoubleDash(String [] array) {
String [] extraChildArgv = null;
for (int i = 0; i < array.length; i++) {
if ("--".equals(array[i])) {
extraChildArgv = Arrays.copyOfRange(array, i, array.length);
break;
}
}
return extraChildArgv;
}
private int configureChildTool(SqoopOptions childOptions,
SqoopTool childTool, String [] childArgv) {
// Within the child arguments there may be a '--' followed by
// dependent args. Stash them off to the side.
// Everything up to the '--'.
String [] parseableChildArgv = getElementsUpToDoubleDash(childArgv);
// The '--' and any subsequent args.
String [] extraChildArgv = getElementsAfterDoubleDash(childArgv);
// Now feed the arguments into the tool itself.
try {
childOptions = childTool.parseArguments(parseableChildArgv,
null, childOptions, false);
childTool.appendArgs(extraChildArgv);
childTool.validateOptions(childOptions);
} catch (ParseException pe) {
LOG.error("Error parsing arguments to the job-specific tool: ", pe);
LOG.error("See 'sqoop help <tool>' for usage.");
return 1;
} catch (SqoopOptions.InvalidOptionsException e) {
System.err.println(e.getMessage());
return 1;
}
return 0; // Success.
}
private int createJob(SqoopOptions options) throws IOException {
// In our extraArguments array, we should have a '--' followed by
// a tool name, and any tool-specific arguments.
// Create an instance of the named tool and then configure it to
// get a SqoopOptions out which we will serialize into a job.
int dashPos = getDashPosition(extraArguments);
int toolArgPos = dashPos + 1;
if (null == extraArguments || toolArgPos < 0
|| toolArgPos >= extraArguments.length) {
LOG.error("No tool specified; cannot create a job.");
LOG.error("Use: sqoop job --create <job-name> "
+ "-- <tool-name> [tool-args]");
return 1;
}
String jobToolName = extraArguments[toolArgPos];
SqoopTool jobTool = SqoopTool.getTool(jobToolName);
if (null == jobTool) {
LOG.error("No such tool available: " + jobToolName);
return 1;
}
// Create a SqoopOptions and Configuration based on the current one,
// but deep-copied. This will be populated within the job.
SqoopOptions jobOptions = new SqoopOptions();
jobOptions.setConf(new Configuration(options.getConf()));
// Get the arguments to feed to the child tool.
String [] childArgs = Arrays.copyOfRange(extraArguments, toolArgPos + 1,
extraArguments.length);
int confRet = configureChildTool(jobOptions, jobTool, childArgs);
if (0 != confRet) {
// Error.
return confRet;
}
// Now that the tool is fully configured, materialize the job.
//TODO(jarcec): Remove the cast when JobData will be moved to apache package
JobData jobData = new JobData(jobOptions,
(com.cloudera.sqoop.tool.SqoopTool)jobTool);
this.storage.create(jobName, jobData);
return 0; // Success.
}
private int listJobs(SqoopOptions opts) throws IOException {
List<String> jobNames = storage.list();
System.out.println("Available jobs:");
for (String name : jobNames) {
System.out.println(" " + name);
}
return 0;
}
private int deleteJob(SqoopOptions opts) throws IOException {
this.storage.delete(jobName);
return 0;
}
private int execJob(SqoopOptions opts) throws IOException {
JobData data = this.storage.read(jobName);
if (null == data) {
LOG.error("No such job: " + jobName);
return 1;
}
SqoopOptions childOpts = data.getSqoopOptions();
SqoopTool childTool = data.getSqoopTool();
// Don't overwrite the original SqoopOptions with the
// arguments; make a child options.
SqoopOptions clonedOpts = (SqoopOptions) childOpts.clone();
clonedOpts.setParent(childOpts);
String [] childArgv = childOpts.getExtraArgs();
if (childArgv == null || childArgv.length == 0) {
childArgv = new String[0];
}
int dashPos = getDashPosition(extraArguments);
if (dashPos < extraArguments.length) {
String[] extraArgs = Arrays.copyOfRange(extraArguments, dashPos + 1,
extraArguments.length);
if (childArgv == null || childArgv.length == 0) {
childArgv = extraArgs;
} else {
// Find the second dash pos
int dashPos2 = getDashPosition(extraArgs);
if (dashPos2 >= extraArgs.length) {
// if second dash is not present add it
extraArgs = ArrayUtils.addAll(extraArgs, DASH_STR);
}
childArgv = ArrayUtils.addAll(extraArgs, childArgv);
}
}
int confRet = configureChildTool(clonedOpts, childTool, childArgv);
if (0 != confRet) {
// Error.
return confRet;
}
return childTool.run(clonedOpts);
}
private int showJob(SqoopOptions opts) throws IOException {
JobData data = this.storage.read(jobName);
if (null == data) {
LOG.error("No such job: " + jobName);
return 1;
}
SqoopOptions childOpts = data.getSqoopOptions();
SqoopTool childTool = data.getSqoopTool();
System.out.println("Job: " + jobName);
System.out.println("Tool: " + childTool.getToolName());
System.out.println("Options:");
System.out.println("----------------------------");
Properties props = childOpts.writeProperties();
for (Map.Entry<Object, Object> entry : props.entrySet()) {
System.out.println(entry.getKey().toString() + " = " + entry.getValue());
}
// TODO: This does not show entries in the Configuration
// (SqoopOptions.getConf()) which were stored as different from the
// default.
return 0;
}
@Override
/** {@inheritDoc} */
public int run(SqoopOptions options) {
// Get a JobStorage instance to use to materialize this job.
JobStorageFactory ssf = new JobStorageFactory(options.getConf());
this.storage = ssf.getJobStorage(storageDescriptor);
if (null == this.storage) {
LOG.error("There is no JobStorage implementation available");
LOG.error("that can read your specified storage descriptor.");
LOG.error("Don't know where to save this job info! You may");
LOG.error("need to specify the connect string with --meta-connect.");
return 1;
}
try {
// Open the storage layer.
this.storage.open(this.storageDescriptor);
// And now determine what operation to perform with it.
switch (operation) {
case JobCreate:
return createJob(options);
case JobDelete:
return deleteJob(options);
case JobExecute:
return execJob(options);
case JobList:
return listJobs(options);
case JobShow:
return showJob(options);
default:
LOG.error("Undefined job operation: " + operation);
return 1;
}
} catch (IOException ioe) {
LOG.error("I/O error performing job operation: "
+ StringUtils.stringifyException(ioe));
return 1;
} finally {
if (null != this.storage) {
try {
storage.close();
} catch (IOException ioe) {
LOG.warn("IOException closing JobStorage: "
+ StringUtils.stringifyException(ioe));
}
}
}
}
@Override
/** Configure the command-line arguments we expect to receive */
public void configureOptions(ToolOptions toolOptions) {
toolOptions.addUniqueOptions(getJobOptions());
}
@Override
/** {@inheritDoc} */
public void applyOptions(CommandLine in, SqoopOptions out)
throws InvalidOptionsException {
if (in.hasOption(VERBOSE_ARG)) {
LoggingUtils.setDebugLevel();
LOG.debug("Enabled debug logging.");
}
if (in.hasOption(HELP_ARG)) {
ToolOptions toolOpts = new ToolOptions();
configureOptions(toolOpts);
printHelp(toolOpts);
throw new InvalidOptionsException("");
}
this.storageDescriptor = new TreeMap<String, String>();
applyMetastoreOptions(in, out);
// These are generated via an option group; exactly one
// of this exhaustive list will always be selected.
if (in.hasOption(JOB_CMD_CREATE_ARG)) {
this.operation = JobOp.JobCreate;
this.jobName = in.getOptionValue(JOB_CMD_CREATE_ARG);
} else if (in.hasOption(JOB_CMD_DELETE_ARG)) {
this.operation = JobOp.JobDelete;
this.jobName = in.getOptionValue(JOB_CMD_DELETE_ARG);
} else if (in.hasOption(JOB_CMD_EXEC_ARG)) {
this.operation = JobOp.JobExecute;
this.jobName = in.getOptionValue(JOB_CMD_EXEC_ARG);
} else if (in.hasOption(JOB_CMD_LIST_ARG)) {
this.operation = JobOp.JobList;
} else if (in.hasOption(JOB_CMD_SHOW_ARG)) {
this.operation = JobOp.JobShow;
this.jobName = in.getOptionValue(JOB_CMD_SHOW_ARG);
}
}
private void applyMetastoreOptions(CommandLine in, SqoopOptions out) throws InvalidOptionsException {
String metaConnectString;
String metaUsernameString;
String metaPasswordString;
if (in.hasOption(STORAGE_METASTORE_ARG)) {
metaConnectString = in.getOptionValue(STORAGE_METASTORE_ARG);
this.storageDescriptor.put(GenericJobStorage.META_DRIVER_KEY, chooseDriverType(metaConnectString));
this.storageDescriptor.put(GenericJobStorage.META_CONNECT_KEY, metaConnectString);
} else {
metaConnectString = out.getMetaConnectStr();
this.storageDescriptor.put(GenericJobStorage.META_DRIVER_KEY, chooseDriverType(metaConnectString));
this.storageDescriptor.put(GenericJobStorage.META_CONNECT_KEY, metaConnectString);
}
if (in.hasOption(METASTORE_USER_ARG)) {
metaUsernameString = in.getOptionValue(METASTORE_USER_ARG);
this.storageDescriptor.put(GenericJobStorage.META_USERNAME_KEY, metaUsernameString);
} else {
metaUsernameString = out.getMetaUsername();
this.storageDescriptor.put(GenericJobStorage.META_USERNAME_KEY, metaUsernameString);
}
if (in.hasOption(METASTORE_PASS_ARG)) {
metaPasswordString = in.getOptionValue(METASTORE_PASS_ARG);
this.storageDescriptor.put(GenericJobStorage.META_PASSWORD_KEY, metaPasswordString);
} else {
metaPasswordString = out.getMetaPassword();
this.storageDescriptor.put(GenericJobStorage.META_PASSWORD_KEY, metaPasswordString);
}
}
private String chooseDriverType(String metaConnectString) throws InvalidOptionsException {
for (JdbcDrivers driver : SUPPORTED_DRIVERS) {
if (metaConnectString.startsWith(driver.getSchemePrefix())) {
return driver.getDriverClass();
}
}
throw new InvalidOptionsException("current meta-connect scheme not compatible with metastore");
}
@Override
/** {@inheritDoc} */
public void validateOptions(SqoopOptions options)
throws InvalidOptionsException {
if (null == operation
|| (null == this.jobName && operation != JobOp.JobList)) {
throw new InvalidOptionsException("No job operation specified"
+ HELP_STR);
}
if (operation == JobOp.JobCreate) {
// Check that we have a '--' followed by at least a tool name.
if (extraArguments == null || extraArguments.length == 0) {
throw new InvalidOptionsException(
"Expected: -- <tool-name> [tool-args] "
+ HELP_STR);
}
}
int dashPos = getDashPosition(extraArguments);
if (hasUnrecognizedArgs(extraArguments, 0, dashPos)) {
throw new InvalidOptionsException(HELP_STR);
}
}
@Override
/** {@inheritDoc} */
public void printHelp(ToolOptions opts) {
System.out.println("usage: sqoop " + getToolName()
+ " [GENERIC-ARGS] [JOB-ARGS] [-- [<tool-name>] [TOOL-ARGS]]");
System.out.println("");
opts.printHelp();
System.out.println("");
System.out.println("Generic Hadoop command-line arguments:");
System.out.println("(must preceed any tool-specific arguments)");
ToolRunner.printGenericCommandUsage(System.out);
}
}