| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.sqoop.manager; |
| |
| import java.io.IOException; |
| import java.sql.Connection; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.GnuParser; |
| import org.apache.commons.cli.OptionBuilder; |
| import org.apache.commons.cli.ParseException; |
| import org.apache.commons.lang.StringEscapeUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableExportJob; |
| import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableImportJob; |
| |
| import org.apache.sqoop.SqoopOptions; |
| import org.apache.sqoop.cli.RelatedOptions; |
| import org.apache.sqoop.util.ExportException; |
| import org.apache.sqoop.util.ImportException; |
| |
| /** |
| * Manages direct mode transfers from Netezza databases using the external table |
| * options. |
| */ |
| public class DirectNetezzaManager extends NetezzaManager { |
| |
| public static final Log LOG = LogFactory.getLog(DirectNetezzaManager.class |
| .getName()); |
| |
| public static final String NETEZZA_LOG_DIR_OPT = "netezza.log.dir"; |
| public static final String NETEZZA_LOG_DIR_LONG_ARG = "log-dir"; |
| |
| public static final String NETEZZA_ERROR_THRESHOLD_OPT = |
| "netezza.error.threshold"; |
| public static final String NETEZZA_ERROR_THRESHOLD_LONG_ARG = |
| "max-errors"; |
| |
| public static final String NETEZZA_CTRL_CHARS_OPT = |
| "netezza.ctrl.chars"; |
| public static final String NETEZZA_CTRL_CHARS_LONG_ARG = |
| "ctrl-chars"; |
| |
| |
| public static final String NETEZZA_CRIN_STRING_OPT = |
| "netezza.crin.string"; |
| public static final String NETEZZA_CRIN_STRING_LONG_ARG = |
| "crin-string"; |
| |
| |
| public static final String NETEZZA_IGNORE_ZERO_OPT = |
| "netezza.ignore.zero"; |
| public static final String NETEZZA_IGNORE_ZERO_LONG_ARG = |
| "ignore-zero"; |
| |
| public static final String NETEZZA_TRUNC_STRING_OPT = |
| "netezza.trunc.string"; |
| public static final String NETEZZA_TRUNC_STRING_LONG_ARG = |
| "trunc-string"; |
| |
| public static final String NETEZZA_TABLE_ENCODING_OPT = |
| "netezza.table.encoding"; |
| public static final String NETEZZA_TABLE_ENCODING_LONG_ARG = |
| "encoding"; |
| |
| |
| private static final String QUERY_CHECK_DICTIONARY_FOR_TABLE = |
| "SELECT 1 FROM _V_TABLE WHERE OWNER= ? " |
| + " AND TABLENAME = ?"; |
| public static final String NETEZZA_NULL_VALUE = |
| "netezza.exttable.null.value"; |
| |
| public DirectNetezzaManager(SqoopOptions opts) { |
| super(opts); |
| try { |
| handleNetezzaExtraArgs(options); |
| } catch (ParseException pe) { |
| throw new RuntimeException(pe.getMessage(), pe); |
| } |
| } |
| |
| private void checkNullValueStrings(String nullStrValue, |
| String nullNonStrValue) throws IOException { |
| |
| if (!StringUtils.equals(nullStrValue, nullNonStrValue)) { |
| throw new IOException( |
| "Detected different values of --input-string and --input-non-string " |
| + "parameters. Netezza direct manager does not support that. Please " |
| + "either use the same values or omit the --direct parameter."); |
| } |
| |
| |
| // Null String values cannot be more 4 chars in length in the case |
| // Netezza external tables. |
| |
| if (nullStrValue != null) { |
| nullStrValue = StringEscapeUtils.unescapeJava(nullStrValue); |
| if (nullStrValue.length() > 4) { |
| throw new IOException( |
| "Null string (and null non string) values for Netezza direct mode" |
| + " manager must be less than 4 characters in length"); |
| } |
| options.getConf().set(NETEZZA_NULL_VALUE, nullStrValue); |
| } |
| } |
| |
| /** |
| * Check Table if it is valid for export. Parse the table like what we do in |
| * Oracle manager |
| * |
| * @throws IOException |
| * @throws ExportException |
| */ |
| private void checkTable() throws IOException, ExportException { |
| String tableOwner = this.options.getUsername(); |
| String tableName = this.options.getTableName(); |
| String shortTableName = tableName; |
| int qualifierIndex = tableName.indexOf('.'); |
| if (qualifierIndex != -1) { |
| tableOwner = tableName.substring(0, qualifierIndex); |
| shortTableName = tableName.substring(qualifierIndex + 1); |
| } |
| Connection conn = null; |
| PreparedStatement ps = null; |
| ResultSet rs = null; |
| |
| try { |
| try { |
| conn = getConnection(); |
| ps = conn.prepareStatement(QUERY_CHECK_DICTIONARY_FOR_TABLE, |
| ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); |
| ps.setString(1, tableOwner); |
| ps.setString(2, shortTableName); |
| rs = ps.executeQuery(); |
| if (!rs.next()) { |
| String message = tableName |
| + " is not a valid Netezza table. " |
| + "Please make sure that you have connected to the Netezza DB " |
| + "and the table name is right. The current values are\n\t" |
| + " connection string : " + options.getConnectString() |
| + "\n\t table owner : " + tableOwner + "\n\t table name : " |
| + shortTableName; |
| LOG.error(message); |
| throw new IOException(message); |
| } |
| } finally { |
| if (rs != null) { |
| rs.close(); |
| } |
| if (ps != null) { |
| ps.close(); |
| } |
| close(); |
| } |
| } catch (SQLException sqle) { |
| throw new IOException("SQL exception checking table " |
| + sqle.getMessage(), sqle); |
| } |
| } |
| |
| /** |
| * Export data stored in HDFS into a table in a database. |
| */ |
| public void exportTable(org.apache.sqoop.manager.ExportJobContext context) |
| throws IOException, ExportException { |
| options = context.getOptions(); |
| context.setConnManager(this); |
| |
| checkTable(); // Throws exception as necessary |
| NetezzaExternalTableExportJob exporter = null; |
| |
| char qc = (char) options.getInputEnclosedBy(); |
| char ec = (char) options.getInputEscapedBy(); |
| checkNullValueStrings(options.getInNullStringValue(), |
| options.getInNullNonStringValue()); |
| |
| if (qc > 0 && !(qc == '"' || qc == '\'')) { |
| throw new ExportException("Input enclosed-by character must be '\"' " |
| + "or ''' for netezza direct mode exports"); |
| } |
| if (ec > 0 && ec != '\\') { |
| throw new ExportException("Input escaped-by character must be '\\' " |
| + "for netezza direct mode exports"); |
| } |
| exporter = new NetezzaExternalTableExportJob(context); |
| exporter.runExport(); |
| } |
| |
| /** |
| * Import the table into HDFS by using Netezza external tables to pull out the |
| * data from the database and upload the files directly to HDFS. |
| */ |
| @Override |
| public void importTable(org.apache.sqoop.manager.ImportJobContext context) |
| throws IOException, ImportException { |
| |
| context.setConnManager(this); |
| |
| String tableName = context.getTableName(); |
| String jarFile = context.getJarFile(); |
| SqoopOptions options = context.getOptions(); |
| |
| if (null == tableName) { |
| LOG. |
| error("Netezza external table import does not support query imports."); |
| LOG. |
| error("Do not use --direct and --query together for Netezza."); |
| throw |
| new IOException("Null tableName for Netezza external table import."); |
| } |
| |
| checkNullValueStrings(options.getNullStringValue(), |
| options.getNullNonStringValue()); |
| |
| char qc = options.getOutputEnclosedBy(); |
| char ec = options.getOutputEscapedBy(); |
| |
| if (qc > 0 && !(qc == '"' || qc == '\'')) { |
| throw new ImportException("Output enclosed-by character must be '\"' " |
| + "or ''' for netezza direct mode imports"); |
| } |
| if (ec > 0 && ec != '\\') { |
| throw new ImportException("Output escaped-by character must be '\\' " |
| + "for netezza direct mode exports"); |
| } |
| |
| NetezzaExternalTableImportJob importer = null; |
| |
| importer = new NetezzaExternalTableImportJob(options, context); |
| |
| // Direct Netezza Manager will use the datasliceid so no split columns |
| // will be used. |
| |
| LOG.info("Beginning netezza fast path import"); |
| |
| if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) { |
| LOG.warn("File import layout " + options.getFileLayout() |
| + " is not supported by"); |
| LOG.warn("Netezza direct import; import will proceed as text files."); |
| } |
| |
| importer.runImport(tableName, jarFile, null, options.getConf()); |
| } |
| |
| protected RelatedOptions getNetezzaExtraOpts() { |
| // Just add the options from NetezzaManager and ignore the setting |
| // for direct mode access |
| RelatedOptions netezzaOpts = |
| new RelatedOptions("Netezza Connector Direct mode options"); |
| |
| netezzaOpts.addOption(OptionBuilder |
| .withArgName(NETEZZA_ERROR_THRESHOLD_OPT).hasArg() |
| .withDescription("Error threshold for the job") |
| .withLongOpt(NETEZZA_ERROR_THRESHOLD_LONG_ARG).create()); |
| netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_LOG_DIR_OPT) |
| .hasArg().withDescription("Netezza log directory") |
| .withLongOpt(NETEZZA_LOG_DIR_LONG_ARG).create()); |
| netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_CTRL_CHARS_OPT) |
| .withDescription("Allow control chars in data") |
| .withLongOpt(NETEZZA_CTRL_CHARS_LONG_ARG).create()); |
| netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_TRUNC_STRING_OPT) |
| .withDescription("Truncate string to declared storage size") |
| .withLongOpt(NETEZZA_TRUNC_STRING_LONG_ARG).create()); |
| netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_CRIN_STRING_OPT) |
| .withDescription("Truncate string to declared storage size") |
| .withLongOpt(NETEZZA_CRIN_STRING_LONG_ARG).create()); |
| netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_IGNORE_ZERO_OPT) |
| .withDescription("Truncate string to declared storage size") |
| .withLongOpt(NETEZZA_IGNORE_ZERO_LONG_ARG).create()); |
| netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_TABLE_ENCODING_OPT) |
| .hasArg().withDescription("Table encoding") |
| .withLongOpt(NETEZZA_TABLE_ENCODING_LONG_ARG).create()); |
| return netezzaOpts; |
| } |
| |
| private void handleNetezzaExtraArgs(SqoopOptions opts) |
| throws ParseException { |
| |
| Configuration conf = opts.getConf(); |
| |
| String[] extraArgs = opts.getExtraArgs(); |
| |
| RelatedOptions netezzaOpts = getNetezzaExtraOpts(); |
| CommandLine cmdLine = new GnuParser().parse(netezzaOpts, extraArgs, true); |
| if (cmdLine.hasOption(NETEZZA_ERROR_THRESHOLD_LONG_ARG)) { |
| int threshold = Integer.parseInt(cmdLine |
| .getOptionValue(NETEZZA_ERROR_THRESHOLD_LONG_ARG)); |
| conf.setInt(NETEZZA_ERROR_THRESHOLD_OPT, threshold); |
| } |
| if (cmdLine.hasOption(NETEZZA_LOG_DIR_LONG_ARG)) { |
| String dir = cmdLine.getOptionValue(NETEZZA_LOG_DIR_LONG_ARG); |
| conf.set(NETEZZA_LOG_DIR_OPT, dir); |
| } |
| if (cmdLine.hasOption(NETEZZA_TABLE_ENCODING_LONG_ARG)) { |
| String encoding = cmdLine |
| .getOptionValue(NETEZZA_TABLE_ENCODING_LONG_ARG); |
| conf.set(NETEZZA_TABLE_ENCODING_OPT, encoding); |
| } |
| |
| conf.setBoolean(NETEZZA_CTRL_CHARS_OPT, |
| cmdLine.hasOption(NETEZZA_CTRL_CHARS_LONG_ARG)); |
| |
| conf.setBoolean(NETEZZA_TRUNC_STRING_OPT, |
| cmdLine.hasOption(NETEZZA_TRUNC_STRING_LONG_ARG)); |
| |
| conf.setBoolean(NETEZZA_CRIN_STRING_OPT, |
| cmdLine.hasOption(NETEZZA_CRIN_STRING_LONG_ARG)); |
| |
| conf.setBoolean(NETEZZA_IGNORE_ZERO_OPT, |
| cmdLine.hasOption(NETEZZA_IGNORE_ZERO_LONG_ARG)); |
| |
| // Always true for Netezza direct mode access |
| conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, true); |
| } |
| |
| @Override |
| public boolean supportsStagingForExport() { |
| return false; |
| } |
| |
| @Override |
| public boolean isORMFacilitySelfManaged() { |
| if (options.getHCatTableName() != null) { |
| return false; |
| } |
| return true; |
| } |
| |
| @Override |
| public boolean isDirectModeHCatSupported() { |
| return true; |
| } |
| |
| |
| public static String getLocalLogDir(TaskAttemptID attemptId) { |
| int tid = attemptId.getTaskID().getId(); |
| int aid = attemptId.getId(); |
| String jid = attemptId.getJobID().toString(); |
| StringBuilder sb = new StringBuilder(jid).append('-'); |
| sb.append(tid).append('-').append(aid); |
| String localLogDir = sb.toString(); |
| return localLogDir; |
| } |
| } |