blob: 5f0f13f7942881333e18d0fc0ce1483b9a301a79 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager;
import static org.apache.sqoop.manager.JdbcDrivers.NETEZZA;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.mapreduce.AsyncSqlOutputFormat;
import org.apache.sqoop.mapreduce.netezza.NetezzaDataDrivenDBInputFormat;
import org.apache.sqoop.SqoopOptions;
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.config.ConfigurationHelper;
import org.apache.sqoop.util.ExportException;
import org.apache.sqoop.util.ImportException;
/**
* Manages connections to Netezza databases.
*/
public class NetezzaManager extends GenericJdbcManager {
public static final Log LOG = LogFactory.getLog(NetezzaManager.class
.getName());
// set to true after we warn the user that we can use direct fastpath.
protected static boolean directModeWarningPrinted = false;
// set to true after we warn the user that they should consider using
// batching.
protected static boolean batchModeWarningPrinted = false;
public static final String NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT =
"netezza.dataslice.aligned.access";
public static final String NETEZZA_DATASLICE_ALIGNED_ACCESS_LONG_ARG =
"partitioned-access";
public NetezzaManager(final SqoopOptions opts) {
super(NETEZZA.getDriverClass(), opts);
}
@Override
public String escapeColName(String colName) {
return escapeIdentifier(colName);
}
@Override
public String escapeTableName(String tableName) {
return escapeIdentifier(tableName);
}
protected String escapeIdentifier(String identifier) {
if (identifier == null) {
return null;
}
return "\"" + identifier.replace("\"", "\"\"") + "\"";
}
@Override
public void close() throws SQLException {
if (this.hasOpenConnection()) {
this.getConnection().rollback(); // Rollback any changes
}
super.close();
}
@Override
public void importTable(org.apache.sqoop.manager.ImportJobContext context)
throws IOException, ImportException {
context.setConnManager(this);
// The user probably should have requested --direct to invoke external
// table option.
// Display a warning informing them of this fact.
if (!NetezzaManager.directModeWarningPrinted) {
LOG.warn("It looks like you are importing from Netezza.");
LOG.warn("This transfer can be faster! Use the --direct");
LOG.warn("option to exercise a Netezza-specific fast path.");
NetezzaManager.directModeWarningPrinted = true; // don't display this
// twice.
}
try {
handleNetezzaImportExtraArgs(context);
} catch (ParseException pe) {
throw (ImportException) new ImportException(pe.getMessage(), pe);
}
// Then run the normal importTable() method.
super.importTable(context);
}
@Override
public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
// The user probably should have requested --direct to invoke external
// table option.
// Display a warning informing them of this fact.
context.setConnManager(this);
if (!NetezzaManager.directModeWarningPrinted) {
LOG.warn("It looks like you are exporting to Netezza.");
LOG.warn("This transfer can be faster! Use the --direct");
LOG.warn("option to exercise a Netezza-specific fast path.");
NetezzaManager.directModeWarningPrinted = true; // don't display this
// twice.
}
// Netezza does not have multi row inserts
if (!options.isBatchMode()) {
if (!NetezzaManager.batchModeWarningPrinted) {
LOG.warn("It looks like you are exporting to Netezza in non-batch ");
LOG.warn("mode. Still this transfer can be made faster! Use the ");
LOG.warn("--batch option to exercise a Netezza-specific fast path.");
LOG.warn("Forcing records per statement to 1 in non batch mode");
NetezzaManager.batchModeWarningPrinted = true; // don't display this
// twice.
}
context.getOptions().getConf()
.setInt(AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY, 1);
}
// options.setBatchMode(true);
// TODO Force batchmode?
super.exportTable(context);
}
@Override
public void updateTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
if (options.getNumMappers() > 1) {
String msg = "Netezza update with multiple mappers can lead to "
+ "inconsistencies - Please set num-mappers option to 1 in the SQOOP "
+ "command line for update jobs with Netezza and SQOOP";
throw new ExportException(msg);
}
if (!options.isBatchMode()) {
if (!NetezzaManager.batchModeWarningPrinted) {
LOG.warn("It looks like you are exporting to Netezza in non-batch ");
LOG.warn("mode. Still this transfer can be made faster! Use the ");
LOG.warn("--batch option to exercise a Netezza-specific fast path.");
LOG.warn("Forcing records per statement to 1 in non batch mode");
NetezzaManager.batchModeWarningPrinted = true; // don't display this
// twice.
}
context.getOptions().getConf()
.setInt(AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY, 1);
}
super.updateTable(context);
}
@Override
public boolean supportsStagingForExport() {
return true;
}
@Override
protected String getCurTimestampQuery() {
return "SELECT CURRENT_TIMESTAMP";
}
protected RelatedOptions getNetezzaExtraOpts() {
RelatedOptions netezzaOpts = new RelatedOptions("Netezza options");
netezzaOpts.addOption(OptionBuilder
.withArgName(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT).hasArg()
.withDescription("Data slice aligned import")
.withLongOpt(NETEZZA_DATASLICE_ALIGNED_ACCESS_LONG_ARG).create());
return netezzaOpts;
}
private void handleNetezzaImportExtraArgs(ImportJobContext context)
throws ParseException {
SqoopOptions opts = context.getOptions();
Configuration conf = opts.getConf();
String[] extraArgs = opts.getExtraArgs();
conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, false);
if (extraArgs != null && extraArgs.length > 0
&& ConfigurationHelper.getConfNumMaps(conf) > 1) {
RelatedOptions netezzaOpts = getNetezzaExtraOpts();
CommandLine cmdLine = new GnuParser().parse(netezzaOpts, extraArgs, true);
if (cmdLine.hasOption(NETEZZA_DATASLICE_ALIGNED_ACCESS_LONG_ARG)) {
conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, true);
context.setInputFormat(NetezzaDataDrivenDBInputFormat.class);
}
}
}
}