blob: 63b070416b1dc0672ddf8b80941a782807e0e63b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.manager;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.sqoop.cli.RelatedOptions;
import org.apache.sqoop.mapreduce.ExportInputFormat;
import org.apache.sqoop.mapreduce.postgresql.PostgreSQLCopyExportJob;
import org.apache.sqoop.util.PostgreSQLUtils;
import org.apache.sqoop.util.SubstitutionUtils;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.io.SplittableBufferedWriter;
import com.cloudera.sqoop.manager.ExportJobContext;
import com.cloudera.sqoop.util.AsyncSink;
import com.cloudera.sqoop.util.DirectImportUtils;
import com.cloudera.sqoop.util.ErrorableAsyncSink;
import com.cloudera.sqoop.util.ErrorableThread;
import com.cloudera.sqoop.util.ExportException;
import com.cloudera.sqoop.util.Executor;
import com.cloudera.sqoop.util.ImportException;
import com.cloudera.sqoop.util.JdbcUrl;
import com.cloudera.sqoop.util.LoggingAsyncSink;
import com.cloudera.sqoop.util.PerfCounters;
/**
* Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
* commands.
*/
public class DirectPostgresqlManager
extends com.cloudera.sqoop.manager.PostgresqlManager {
public static final Log LOG = LogFactory.getLog(
DirectPostgresqlManager.class.getName());
public static final String BOOLEAN_TRUE_STRING = "boolean-true-string";
public static final String DEFAULT_BOOLEAN_TRUE_STRING = "TRUE";
public static final String BOOLEAN_FALSE_STRING = "boolean-false-string";
public static final String DEFAULT_BOOLEAN_FALSE_STRING = "FALSE";
public DirectPostgresqlManager(final SqoopOptions opts) {
super(opts);
if (this.booleanFalseString == null) {
this.booleanFalseString = DEFAULT_BOOLEAN_FALSE_STRING;
}
if (booleanTrueString == null) {
this.booleanTrueString = DEFAULT_BOOLEAN_TRUE_STRING;
}
}
private static final String PSQL_CMD = "psql";
private String booleanTrueString;
private String booleanFalseString;
/** Copies data directly into HDFS, adding the user's chosen line terminator
char to each record.
*/
static class PostgresqlAsyncSink extends ErrorableAsyncSink {
private final SplittableBufferedWriter writer;
private final PerfCounters counters;
private final SqoopOptions options;
PostgresqlAsyncSink(final SplittableBufferedWriter w,
final SqoopOptions opts, final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.counters = ctrs;
}
public void processStream(InputStream is) {
child = new PostgresqlStreamThread(is, writer, options, counters);
child.start();
}
private static class PostgresqlStreamThread extends ErrorableThread {
public static final Log LOG = LogFactory.getLog(
PostgresqlStreamThread.class.getName());
private final SplittableBufferedWriter writer;
private final InputStream stream;
private final SqoopOptions options;
private final PerfCounters counters;
PostgresqlStreamThread(final InputStream is,
final SplittableBufferedWriter w,
final SqoopOptions opts, final PerfCounters ctrs) {
this.stream = is;
this.writer = w;
this.options = opts;
this.counters = ctrs;
}
public void run() {
BufferedReader r = null;
SplittableBufferedWriter w = this.writer;
char recordDelim = this.options.getOutputRecordDelim();
try {
r = new BufferedReader(new InputStreamReader(this.stream));
// read/write transfer loop here.
while (true) {
String inLine = r.readLine();
if (null == inLine) {
break; // EOF
}
w.write(inLine);
w.write(recordDelim);
w.allowSplit();
counters.addBytes(1 + inLine.length());
}
} catch (IOException ioe) {
LOG.error("IOException reading from psql: " + ioe.toString());
// set the error bit so our caller can see that something went wrong.
setError();
} finally {
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.info("Error closing FIFO stream: " + ioe.toString());
}
}
if (null != w) {
try {
w.close();
} catch (IOException ioe) {
LOG.info("Error closing HDFS stream: " + ioe.toString());
}
}
}
}
}
}
/**
* Takes a list of columns and turns them into a string like
* "col1, col2, col3...".
*/
private String getColumnListStr(String [] cols) {
if (null == cols) {
return null;
}
StringBuilder sb = new StringBuilder();
boolean first = true;
for (String col : cols) {
if (!first) {
sb.append(", ");
}
sb.append(escapeColName(col));
first = false;
}
return sb.toString();
}
/**
* We need to modify boolean results to 'TRUE' and 'FALSE' instead
* of postgres t/f syntax. Otherwise hive will ignore them and import nulls
*
* https://issues.cloudera.org/browse/SQOOP-43
*
*
* @param cols
* @return
*/
private String getSelectListColumnsStr(String [] cols, String tableName) {
if (null == cols || tableName == null) {
return null;
}
Map<String, String> columnTypes = getColumnTypeNamesForTable(tableName);
StringBuilder sb = new StringBuilder();
boolean first = true;
for (String col : cols) {
String colEscaped = escapeColName(col);
if (!first) {
sb.append(", ");
}
if (columnTypes.get(col) == null) {
LOG.error("can not find " + col + " in type medatadata");
sb.append(col);
} else {
if ("bool".equalsIgnoreCase(columnTypes.get(col))) {
sb.append(String.format("case when %s=true then "
+ "'" + booleanTrueString + "' "
+ "when %s=false then "
+ "'" + booleanFalseString + "'"
+ " end as %s",
colEscaped, colEscaped, colEscaped));
} else if ("bit".equalsIgnoreCase(columnTypes.get(col))) {
sb.append(String.format("case when %s=B'1' then "
+ "'" + booleanTrueString + "' "
+ "when %s=B'0' then "
+ "'" + booleanFalseString + "'"
+ " end as %s",
colEscaped, colEscaped, colEscaped));
} else {
sb.append(colEscaped);
}
}
first = false;
}
return sb.toString();
}
/**
* @return the Postgresql-specific SQL command to copy the
* table ("COPY .... TO STDOUT").
*/
private String getCopyCommand(String tableName) {
// Format of this command is:
//
// COPY table(col, col....) TO STDOUT
// or COPY ( query ) TO STDOUT
// WITH DELIMITER 'fieldsep'
// CSV
// QUOTE 'quotechar'
// ESCAPE 'escapechar'
// FORCE QUOTE col, col, col....
StringBuilder sb = new StringBuilder();
String [] cols = getColumnNames(tableName);
String escapedTableName = escapeTableName(tableName);
sb.append("COPY ");
String whereClause = this.options.getWhereClause();
if (whereClause == null || whereClause.isEmpty()) {
whereClause = "1=1";
}
// Import from a SELECT QUERY
sb.append("(");
sb.append("SELECT ");
if (null != cols) {
sb.append(getSelectListColumnsStr(cols, tableName));
} else {
sb.append("*");
}
sb.append(" FROM ");
sb.append(escapedTableName);
sb.append(" WHERE ");
sb.append(whereClause);
sb.append(")");
// Translate delimiter characters to '\ooo' octal representation.
sb.append(" TO STDOUT WITH DELIMITER E'\\");
sb.append(Integer.toString((int) this.options.getOutputFieldDelim(), 8));
sb.append("' CSV ");
if (this.options.getNullStringValue() != null) {
sb.append("NULL AS E'");
sb.append(SubstitutionUtils.removeEscapeCharacters(
this.options.getNullStringValue()));
sb.append("' ");
}
if (this.options.getOutputEnclosedBy() != '\0') {
sb.append("QUOTE E'\\");
sb.append(Integer.toString((int) this.options.getOutputEnclosedBy(), 8));
sb.append("' ");
}
if (this.options.getOutputEscapedBy() != '\0') {
sb.append("ESCAPE E'\\");
sb.append(Integer.toString((int) this.options.getOutputEscapedBy(), 8));
sb.append("' ");
}
// add the "FORCE QUOTE col, col, col..." clause if quotes are required.
if (null != cols && this.options.isOutputEncloseRequired()) {
sb.append("FORCE QUOTE ");
sb.append(getColumnListStr(cols));
}
sb.append(";");
String copyCmd = sb.toString();
LOG.info("Copy command is " + copyCmd);
return copyCmd;
}
/** Write the COPY command to a temp file.
* @return the filename we wrote to.
*/
private String writeCopyCommand(String command) throws IOException {
String tmpDir = options.getTempDir();
File tempFile = File.createTempFile("tmp-", ".sql", new File(tmpDir));
BufferedWriter w = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(tempFile)));
w.write(command);
w.newLine();
w.close();
return tempFile.toString();
}
// TODO(aaron): Refactor this method to be much shorter.
// CHECKSTYLE:OFF
@Override
/**
* Import the table into HDFS by using psql to pull the data out of the db
* via COPY FILE TO STDOUT.
*/
public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
throws IOException, ImportException {
context.setConnManager(this);
String tableName = context.getTableName();
SqoopOptions options = context.getOptions();
LOG.info("Beginning psql fast path import");
if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
// TODO(aaron): Support SequenceFile-based load-in
LOG.warn("File import layout" + options.getFileLayout()
+ " is not supported by");
LOG.warn("Postgresql direct import; import will proceed as text files.");
}
if (!StringUtils.equals(options.getNullStringValue(),
options.getNullNonStringValue())) {
throw new ImportException(
"Detected different values of --input-string and --input-non-string " +
"parameters. PostgreSQL direct manager do not support that. Please " +
"either use the same values or omit the --direct parameter.");
}
String commandFilename = null;
String passwordFilename = null;
Process p = null;
AsyncSink sink = null;
AsyncSink errSink = null;
PerfCounters counters = new PerfCounters();
try {
// Get the COPY TABLE command to issue, write this to a file, and pass
// it in to psql with -f filename. Then make sure we delete this file
// in our finally block.
String copyCmd = getCopyCommand(tableName);
commandFilename = writeCopyCommand(copyCmd);
// Arguments to pass to psql on the command line.
ArrayList<String> args = new ArrayList<String>();
// Environment to pass to psql.
List<String> envp = Executor.getCurEnvpStrings();
// We need to parse the connect string URI to determine the database
// name and the host and port. If the host is localhost and the port is
// not specified, we don't want to pass this to psql, because we want to
// force the use of a UNIX domain socket, not a TCP/IP socket.
String connectString = options.getConnectString();
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);
if (null == databaseName) {
throw new ImportException("Could not determine database name");
}
LOG.info("Performing import of table " + tableName + " from database "
+ databaseName);
args.add(PSQL_CMD); // requires that this is on the path.
args.add("--tuples-only");
args.add("--quiet");
String username = options.getUsername();
if (username != null) {
args.add("--username");
args.add(username);
String password = options.getPassword();
if (null != password) {
passwordFilename =
PostgreSQLUtils.writePasswordFile(options.getTempDir(), password);
// Need to send PGPASSFILE environment variable specifying
// location of our postgres file.
envp.add("PGPASSFILE=" + passwordFilename);
}
}
args.add("--host");
args.add(hostname);
if (port != -1) {
args.add("--port");
args.add(Integer.toString(port));
}
if (null != databaseName && databaseName.length() > 0) {
args.add(databaseName);
}
// The COPY command is in a script file.
args.add("-f");
args.add(commandFilename);
// begin the import in an external process.
LOG.debug("Starting psql with arguments:");
for (String arg : args) {
LOG.debug(" " + arg);
}
// This writer will be closed by AsyncSink.
SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
options.getConf(), options, context);
// Actually start the psql dump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]),
envp.toArray(new String[0]));
// read from the stdout pipe into the HDFS writer.
InputStream is = p.getInputStream();
sink = new PostgresqlAsyncSink(w, options, counters);
LOG.debug("Starting stream sink");
counters.startClock();
sink.processStream(is);
errSink = new LoggingAsyncSink(LOG);
errSink.processStream(p.getErrorStream());
} finally {
// block until the process is done.
LOG.debug("Waiting for process completion");
int result = 0;
if (null != p) {
while (true) {
try {
result = p.waitFor();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
}
break;
}
}
// Remove any password file we wrote
if (null != passwordFilename) {
if (!new File(passwordFilename).delete()) {
LOG.error("Could not remove postgresql password file "
+ passwordFilename);
LOG.error("You should remove this file to protect your credentials.");
}
}
if (null != commandFilename) {
// We wrote the COPY comand to a tmpfile. Remove it.
if (!new File(commandFilename).delete()) {
LOG.info("Could not remove temp file: " + commandFilename);
}
}
// block until the stream sink is done too.
int streamResult = 0;
if (null != sink) {
while (true) {
try {
streamResult = sink.join();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
}
break;
}
}
// Attempt to block for stderr stream sink; errors are advisory.
if (null != errSink) {
try {
if (0 != errSink.join()) {
LOG.info("Encountered exception reading stderr stream");
}
} catch (InterruptedException ie) {
LOG.info("Thread interrupted waiting for stderr to complete: "
+ ie.toString());
}
}
LOG.info("Transfer loop complete.");
if (0 != result) {
throw new IOException("psql terminated with status "
+ Integer.toString(result));
}
if (0 != streamResult) {
throw new IOException("Encountered exception in stream sink");
}
counters.stopClock();
LOG.info("Transferred " + counters.toString());
}
}
@Override
public boolean supportsStagingForExport() {
return true;
}
// CHECKSTYLE:ON
/** {@inheritDoc}. */
@Override
protected void applyExtraArguments(CommandLine cmdLine) {
super.applyExtraArguments(cmdLine);
if (cmdLine.hasOption(BOOLEAN_TRUE_STRING)) {
String arg = cmdLine.getOptionValue(BOOLEAN_TRUE_STRING);
LOG.info("Loaded TRUE encoding string " + arg);
this.booleanTrueString = arg;
}
if (cmdLine.hasOption(BOOLEAN_FALSE_STRING)) {
String arg = cmdLine.getOptionValue(BOOLEAN_FALSE_STRING);
LOG.info("Loaded FALSE encoding string " + arg);
this.booleanFalseString = arg;
}
}
/** {@inheritDoc}. */
@Override
@SuppressWarnings("static-access")
protected RelatedOptions getExtraOptions() {
RelatedOptions extraOptions = super.getExtraOptions();
extraOptions.addOption(OptionBuilder.withArgName("string").hasArg()
.withDescription("String to encode TRUE value")
.withLongOpt(BOOLEAN_TRUE_STRING).create());
extraOptions.addOption(OptionBuilder.withArgName("string").hasArg()
.withDescription("String to encode FALSE value")
.withLongOpt(BOOLEAN_FALSE_STRING).create());
return extraOptions;
}
public void exportTable(ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
PostgreSQLCopyExportJob job =
new PostgreSQLCopyExportJob(context,
null,
ExportInputFormat.class,
NullOutputFormat.class);
job.runExport();
}
}