blob: 33c57bcc89ec352be4c6b8131a738bc107cddd0d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager;
import static org.apache.sqoop.manager.JdbcDrivers.SQLSERVER;
import java.io.IOException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.mapreduce.JdbcUpsertExportJob;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.mapreduce.JdbcExportJob;
import org.apache.sqoop.mapreduce.JdbcUpdateExportJob;
import org.apache.sqoop.mapreduce.sqlserver.SqlServerInputFormat;
import org.apache.sqoop.util.ExportException;
import org.apache.sqoop.util.ImportException;
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.mapreduce.sqlserver.SqlServerUpsertOutputFormat;
/**
* Manages connections to SQLServer databases. Requires the SQLServer JDBC
* driver.
*/
public class SQLServerManager extends InformationSchemaManager {
public static final String SCHEMA = "schema";
public static final String TABLE_HINTS = "table-hints";
public static final String TABLE_HINTS_PROP
= "org.apache.sqoop.manager.sqlserver.table.hints";
public static final Log LOG = LogFactory.getLog(
SQLServerManager.class.getName());
// Option to allow inserts on identity columns
public static final String IDENTITY_INSERT = "identity-insert";
public static final String IDENTITY_INSERT_PROP =
"org.apache.sqoop.manager.sqlserver.table.identity";
// Define SQL Server specific types that are not covered by parent classes
private static final int DATETIMEOFFSET = -155;
/**
* Schema name that we will use.
*/
private String schema;
/**
* Optional table hints to use.
*/
private String tableHints;
/**
* Whether to allow identity inserts.
*/
private boolean identityInserts;
final SqlServerManagerContextConfigurator formatConfigurator;
public SQLServerManager(final SqoopOptions opts) {
this(SQLSERVER.getDriverClass(), opts);
}
public SQLServerManager(final String driver, final SqoopOptions opts) {
this(driver, opts, new SqlServerManagerContextConfigurator());
}
public SQLServerManager(final String driver, final SqoopOptions opts, final SqlServerManagerContextConfigurator configurator) {
super(driver, opts);
this.formatConfigurator = configurator;
// Try to parse extra arguments
try {
parseExtraArgs(opts.getExtraArgs());
} catch (ParseException e) {
throw new RuntimeException("Can't parse extra arguments", e);
}
}
/**
* Resolve MS SQL Server specific database type to the Java type that should
* contain it.
* @param sqlType sql type
* @return the name of a Java type to hold the sql datatype, or null if none.
*/
@Override
public String toJavaType(int sqlType) {
String javaType;
if (sqlType == DATETIMEOFFSET) {
/*
* We cannot use the TimeStamp class to represent MS SQL Server
* datetimeoffset data type since it does not preserve time zone
* offset values, so use String instead which would work for
* import/export.
*/
javaType = "String";
} else {
//If none of the above data types match, it returns parent method's
//status, which can be null.
javaType = super.toJavaType(sqlType);
}
return javaType;
}
/**
* {@inheritDoc}
*/
@Override
public void importTable(org.apache.sqoop.manager.ImportJobContext context)
throws IOException, ImportException {
// We're the correct connection manager
context.setConnManager(this);
// Propagate table hints to job
Configuration configuration = context.getOptions().getConf();
if (tableHints != null) {
configuration.set(TABLE_HINTS_PROP, tableHints);
}
String splitColumn = getSplitColumn(context.getOptions(), context.getTableName());
context.setInputFormat(SqlServerInputFormat.class);
formatConfigurator.configureContextForImport(context, splitColumn);
super.importTable(context);
}
/**
* Export data stored in HDFS into a table in a database.
*/
@Override
public void exportTable(org.apache.sqoop.manager.ExportJobContext context) throws IOException, ExportException {
context.setConnManager(this);
// Propagate table hints to job
Configuration configuration = context.getOptions().getConf();
if (tableHints != null) {
configuration.set(TABLE_HINTS_PROP, tableHints);
}
// Propagate whether to allow identity inserts to job
configuration.setBoolean(IDENTITY_INSERT_PROP, identityInserts);
formatConfigurator.configureContextForExport(context);
JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
context.getOutputFormatClass(), getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
@Override
/**
* {@inheritDoc}
*/
public void updateTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
boolean runAsExportJob = formatConfigurator.configureContextForUpdate(context, this);
if (runAsExportJob) {
JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, null,
null, context.getOutputFormatClass(), getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
} else {
super.updateTable(context);
}
}
@Override
/**
* {@inheritDoc}
*/
public void upsertTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
// Propagate table hints to job
Configuration configuration = context.getOptions().getConf();
if (tableHints != null) {
configuration.set(TABLE_HINTS_PROP, tableHints);
}
JdbcUpsertExportJob exportJob =
new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}
@Override
/**
* {@inheritDoc}
*/
public void configureDbOutputColumns(SqoopOptions options) {
if (options.getUpdateMode() == SqoopOptions.UpdateMode.UpdateOnly) {
super.configureDbOutputColumns(options);
} else {
// We're in upsert mode. We need to explicitly set
// the database output column ordering in the codeGenerator.
options.setDbOutputColumns(getColumnNames(options.getTableName()));
}
}
/**
* SQLServer does not support the CURRENT_TIMESTAMP() function. Instead
* it has the notion of keyword CURRENT_TIMESTAMP that resolves to the
* current time stamp for the database system.
*/
@Override
public String getCurTimestampQuery() {
return "SELECT CURRENT_TIMESTAMP";
}
@Override
protected String getListDatabasesQuery() {
return "SELECT NAME FROM SYS.DATABASES";
}
@Override
protected String getSchemaQuery() {
if (schema == null) {
return "SELECT SCHEMA_NAME()";
}
return "'" + schema + "'";
}
@Override
protected String getListColumnsQuery(String tableName) {
return
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS "
+ "WHERE TABLE_SCHEMA = (" + getSchemaQuery() + ") "
+ " AND TABLE_NAME = N'" + tableName + "' "
+ " ORDER BY ORDINAL_POSITION";
}
@Override
protected String getPrimaryKeyQuery(String tableName) {
return
"SELECT kcu.COLUMN_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc, "
+ " INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu "
+ "WHERE tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA "
+ " AND tc.TABLE_NAME = kcu.TABLE_NAME "
+ " AND tc.CONSTRAINT_SCHEMA = kcu.CONSTRAINT_SCHEMA "
+ " AND tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME "
+ " AND tc.TABLE_SCHEMA = (" + getSchemaQuery() + ") "
+ " AND tc.TABLE_NAME = N'" + tableName + "' "
+ " AND tc.CONSTRAINT_TYPE = 'PRIMARY KEY'";
}
@Override
public String escapeColName(String colName) {
return escapeObjectName(colName);
}
@Override
public String escapeTableName(String tableName) {
// Return table name including schema if requested
if (schema != null && !schema.isEmpty()) {
return escapeObjectName(schema) + "." + escapeObjectName(tableName);
}
return escapeObjectName(tableName);
}
@Override
public boolean escapeTableNameOnExport() {
return true;
}
/**
* Escape database object name (database, table, column, schema).
*
* @param objectName Object name in database
* @return Escaped variant of the name
*/
public String escapeObjectName(String objectName) {
if (null == objectName) {
return null;
}
return "[" + objectName + "]";
}
/**
* Parse extra arguments.
*
* @param args Extra arguments array
* @throws ParseException
*/
void parseExtraArgs(String[] args) throws ParseException {
// No-op when no extra arguments are present
if (args == null || args.length == 0) {
return;
}
// We do not need extended abilities of SqoopParser, so we're using
// Gnu parser instead.
CommandLineParser parser = new GnuParser();
CommandLine cmdLine = parser.parse(getExtraOptions(), args, true);
// Apply extra options
if (cmdLine.hasOption(SCHEMA)) {
String schemaName = cmdLine.getOptionValue(SCHEMA);
LOG.info("We will use schema " + schemaName);
this.schema = schemaName;
}
// Apply table hints
if (cmdLine.hasOption(TABLE_HINTS)) {
String hints = cmdLine.getOptionValue(TABLE_HINTS);
LOG.info("Sqoop will use following table hints for data transfer: "
+ hints);
this.tableHints = hints;
}
if (cmdLine.hasOption(SqlServerManagerContextConfigurator.RESILIENT_OPTION)) {
LOG.warn("Sqoop will use resilient operations! In case of import, " +
"the split-by column also has to be specified, unique, and in ascending order.");
}
identityInserts = cmdLine.hasOption(IDENTITY_INSERT);
}
/**
* Create related options for SQL Server extra parameters.
*
* @return
*/
@SuppressWarnings("static-access")
private RelatedOptions getExtraOptions() {
// Connection args (common)
RelatedOptions extraOptions =
new RelatedOptions("SQL Server extra options:");
extraOptions.addOption(OptionBuilder.withArgName("string").hasArg()
.withDescription("Optional schema name")
.withLongOpt(SCHEMA).create());
extraOptions.addOption(OptionBuilder.withArgName("string").hasArg()
.withDescription("Optional table hints to use")
.withLongOpt(TABLE_HINTS).create());
extraOptions.addOption(OptionBuilder
.withDescription("Allow identity inserts")
.withLongOpt(IDENTITY_INSERT).create());
extraOptions.addOption(OptionBuilder
.withLongOpt(SqlServerManagerContextConfigurator.RESILIENT_OPTION).create());
return extraOptions;
}
/**
* Launch a MapReduce job via DataDrivenImportJob to read the table with
* SQLServerDBInputFormat which handles connection failures while
* using free-form query importer.
*/
public void importQuery(org.apache.sqoop.manager.ImportJobContext context)
throws IOException, ImportException {
String splitColumn = getSplitColumn(context.getOptions(), context.getTableName());
formatConfigurator.configureContextForImport(context, splitColumn);
super.importQuery(context);
}
}