blob: 6dbb98d7fc525251a2185b91b41d60aa04a140dc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.db.netezza;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.sqoop.io.NamedFifo;
import org.apache.sqoop.lib.DelimiterSet;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.manager.DirectNetezzaManager;
import org.apache.sqoop.mapreduce.SqoopMapper;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.util.FileUploader;
import org.apache.sqoop.util.PerfCounters;
import org.apache.sqoop.util.TaskId;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Netezza export mapper using external tables.
*/
public abstract class NetezzaExternalTableExportMapper<K, V> extends
SqoopMapper<K, V, NullWritable, NullWritable> {
/**
* Create a named FIFO, and start the Netezza JDBC thread connected to that
* FIFO. A File object representing the FIFO is in 'fifoFile'.
*/
private Configuration conf;
@VisibleForTesting
DBConfiguration dbc;
@VisibleForTesting
File fifoFile;
private Connection con;
private OutputStream recordWriter;
public static final Log LOG = LogFactory
.getLog(NetezzaExternalTableImportMapper.class.getName());
private NetezzaJDBCStatementRunner extTableThread;
private PerfCounters counter;
private DelimiterSet outputDelimiters;
private String localLogDir = null;
@VisibleForTesting
String logDir = null;
@VisibleForTesting
File taskAttemptDir = null;
private AtomicBoolean jdbcFailed = new AtomicBoolean(false);
private String getSqlStatement(DelimiterSet delimiters) throws IOException {
char fd = delimiters.getFieldsTerminatedBy();
char qc = delimiters.getEnclosedBy();
char ec = delimiters.getEscapedBy();
String nullValue = conf.get(DirectNetezzaManager.NETEZZA_NULL_VALUE);
int errorThreshold = conf.getInt(
DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
boolean ctrlChars =
conf.getBoolean(DirectNetezzaManager.NETEZZA_CTRL_CHARS_OPT, false);
boolean truncString =
conf.getBoolean(DirectNetezzaManager.NETEZZA_TRUNC_STRING_OPT, false);
boolean ignoreZero =
conf.getBoolean(DirectNetezzaManager.NETEZZA_IGNORE_ZERO_OPT, false);
boolean crinString =
conf.getBoolean(DirectNetezzaManager.NETEZZA_CRIN_STRING_OPT, false);
StringBuilder sqlStmt = new StringBuilder(2048);
sqlStmt.append("INSERT INTO ");
sqlStmt.append(dbc.getOutputTableName());
sqlStmt.append(" SELECT * FROM EXTERNAL '");
sqlStmt.append(fifoFile.getAbsolutePath());
sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
sqlStmt.append(" BOOLSTYLE 'TRUE_FALSE' ");
if (crinString) {
sqlStmt.append(" CRINSTRING TRUE ");
} else {
sqlStmt.append(" CRINSTRING FALSE ");
}
if (ctrlChars) {
sqlStmt.append(" CTRLCHARS TRUE ");
}
if (truncString) {
sqlStmt.append(" TRUNCSTRING TRUE ");
}
if (ignoreZero) {
sqlStmt.append(" IGNOREZERO TRUE ");
}
sqlStmt.append(" DELIMITER ");
sqlStmt.append(Integer.toString(fd));
sqlStmt.append(" ENCODING 'internal' ");
if (ec > 0) {
sqlStmt.append(" ESCAPECHAR '\\' ");
}
sqlStmt.append(" FORMAT 'Text' ");
sqlStmt.append(" INCLUDEZEROSECONDS TRUE ");
sqlStmt.append(" NULLVALUE '");
if (nullValue != null) {
sqlStmt.append(nullValue);
} else {
sqlStmt.append("null");
}
sqlStmt.append("' ");
if (qc > 0) {
switch (qc) {
case '\'':
sqlStmt.append(" QUOTEDVALUE SINGLE ");
break;
case '\"':
sqlStmt.append(" QUOTEDVALUE DOUBLE ");
break;
default:
LOG.warn("Unsupported enclosed by character: " + qc + " - ignoring.");
}
}
sqlStmt.append(" MAXERRORS ").append(errorThreshold);
File logDirPath = new File(taskAttemptDir, localLogDir);
logDirPath.mkdirs();
if (logDirPath.canWrite() && logDirPath.isDirectory()) {
sqlStmt.append(" LOGDIR ")
.append(logDirPath.getAbsolutePath()).append(' ');
} else {
throw new IOException("Unable to create log directory specified");
}
sqlStmt.append(")");
String stmt = sqlStmt.toString();
LOG.debug("SQL generated for external table export" + stmt);
return stmt;
}
private void initNetezzaExternalTableExport(Context context)
throws IOException {
this.conf = context.getConfiguration();
taskAttemptDir = TaskId.getLocalWorkPath(conf);
localLogDir =
DirectNetezzaManager.getLocalLogDir(context.getTaskAttemptID());
if (logDir == null) { // need to be able to set in tests
logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
}
if (dbc == null) { // need to be able to mock in tests
dbc = new DBConfiguration(conf);
}
File taskAttemptDir = TaskId.getLocalWorkPath(conf);
char fd = (char) conf.getInt(DelimiterSet.INPUT_FIELD_DELIM_KEY, ',');
char qc = (char) conf.getInt(DelimiterSet.INPUT_ENCLOSED_BY_KEY, 0);
char ec = (char) conf.getInt(DelimiterSet.INPUT_ESCAPED_BY_KEY, 0);
this.outputDelimiters = new DelimiterSet(fd, '\n', qc, ec, false);
this.fifoFile = new File(taskAttemptDir, ("nzexttable-export.txt"));
String filename = fifoFile.toString();
NamedFifo nf;
// Create the FIFO itself.
try {
nf = new NamedFifo(this.fifoFile);
nf.create();
} catch (IOException ioe) {
// Command failed.
LOG.error("Could not create FIFO file " + filename);
this.fifoFile = null;
throw new IOException(
"Could not create FIFO for netezza external table import", ioe);
}
String sqlStmt = getSqlStatement(outputDelimiters);
boolean cleanup = false;
try {
con = dbc.getConnection();
extTableThread = new NetezzaJDBCStatementRunner(jdbcFailed,
con, sqlStmt);
} catch (SQLException sqle) {
cleanup = true;
throw new IOException(sqle);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
} finally {
if (con != null && cleanup) {
try {
con.close();
} catch (Exception e) {
LOG.debug("Exception closing connection " + e.getMessage());
}
}
con = null;
}
counter = new PerfCounters();
extTableThread.start();
// We start the JDBC thread first in this case as we want the FIFO reader to
// be running.
recordWriter = new BufferedOutputStream(new FileOutputStream(nf.getFile()));
counter.startClock();
}
@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);
initNetezzaExternalTableExport(context);
try {
while (context.nextKeyValue()) {
// Fail fast if there was an error during JDBC operation
if (jdbcFailed.get()) {
break;
}
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
} finally {
try {
recordWriter.close();
extTableThread.join();
} catch (Exception e) {
LOG.debug("Exception cleaning up mapper operation : " + e.getMessage());
}
counter.stopClock();
LOG.info("Transferred " + counter.toString());
FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
localLogDir, logDir, context.getJobID().toString(),
conf);
if (extTableThread.hasExceptions()) {
extTableThread.printException();
throw new IOException(extTableThread.getException());
}
}
}
protected void writeTextRecord(Text record) throws IOException {
String outputStr = record.toString() + "\n";
byte[] outputBytes = outputStr.getBytes("UTF-8");
counter.addBytes(outputBytes.length);
recordWriter.write(outputBytes, 0, outputBytes.length);
}
protected void writeSqoopRecord(SqoopRecord sqr) throws IOException {
String outputStr = sqr.toString(this.outputDelimiters);
byte[] outputBytes = outputStr.getBytes("UTF-8");
counter.addBytes(outputBytes.length);
recordWriter.write(outputBytes, 0, outputBytes.length);
}
}