blob: 7c80f70c0182d439e391a9a4977257a837512cc9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.manager;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.util.AsyncSink;
import org.apache.hadoop.sqoop.util.DirectImportUtils;
import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
import org.apache.hadoop.sqoop.util.ErrorableThread;
import org.apache.hadoop.sqoop.util.Executor;
import org.apache.hadoop.sqoop.util.ImportException;
import org.apache.hadoop.sqoop.util.JdbcUrl;
import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
import org.apache.hadoop.sqoop.util.PerfCounters;
/**
* Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
* commands.
*/
public class DirectPostgresqlManager extends PostgresqlManager {
public static final Log LOG = LogFactory.getLog(DirectPostgresqlManager.class.getName());
public DirectPostgresqlManager(final SqoopOptions opts) {
// Inform superclass that we're overriding import method via alt. constructor.
super(opts, true);
}
private static final String PSQL_CMD = "psql";
/** Copies data directly into HDFS, adding the user's chosen line terminator
char to each record.
*/
static class PostgresqlAsyncSink extends ErrorableAsyncSink {
private final SplittableBufferedWriter writer;
private final PerfCounters counters;
private final SqoopOptions options;
PostgresqlAsyncSink(final SplittableBufferedWriter w, final SqoopOptions opts,
final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.counters = ctrs;
}
public void processStream(InputStream is) {
child = new PostgresqlStreamThread(is, writer, options, counters);
child.start();
}
private static class PostgresqlStreamThread extends ErrorableThread {
public static final Log LOG = LogFactory.getLog(PostgresqlStreamThread.class.getName());
private final SplittableBufferedWriter writer;
private final InputStream stream;
private final SqoopOptions options;
private final PerfCounters counters;
PostgresqlStreamThread(final InputStream is, final SplittableBufferedWriter w,
final SqoopOptions opts, final PerfCounters ctrs) {
this.stream = is;
this.writer = w;
this.options = opts;
this.counters = ctrs;
}
public void run() {
BufferedReader r = null;
SplittableBufferedWriter w = this.writer;
char recordDelim = this.options.getOutputRecordDelim();
try {
r = new BufferedReader(new InputStreamReader(this.stream));
// read/write transfer loop here.
while (true) {
String inLine = r.readLine();
if (null == inLine) {
break; // EOF
}
w.write(inLine);
w.write(recordDelim);
w.allowSplit();
counters.addBytes(1 + inLine.length());
}
} catch (IOException ioe) {
LOG.error("IOException reading from psql: " + ioe.toString());
// set the error bit so our caller can see that something went wrong.
setError();
} finally {
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.info("Error closing FIFO stream: " + ioe.toString());
}
}
if (null != w) {
try {
w.close();
} catch (IOException ioe) {
LOG.info("Error closing HDFS stream: " + ioe.toString());
}
}
}
}
}
}
/**
Takes a list of columns and turns them into a string like "col1, col2, col3..."
*/
private String getColumnListStr(String [] cols) {
if (null == cols) {
return null;
}
StringBuilder sb = new StringBuilder();
boolean first = true;
for (String col : cols) {
if (!first) {
sb.append(", ");
}
sb.append(col);
first = false;
}
return sb.toString();
}
/**
* @return the Postgresql-specific SQL command to copy the table ("COPY .... TO STDOUT")
*/
private String getCopyCommand(String tableName) {
/*
Format of this command is:
COPY table(col, col....) TO STDOUT
or COPY ( query ) TO STDOUT
WITH DELIMITER 'fieldsep'
CSV
QUOTE 'quotechar'
ESCAPE 'escapechar'
FORCE QUOTE col, col, col....
*/
StringBuilder sb = new StringBuilder();
String [] cols = getColumnNames(tableName);
sb.append("COPY ");
String whereClause = this.options.getWhereClause();
if (whereClause != null && whereClause.length() > 0) {
// Import from a SELECT QUERY
sb.append("(");
sb.append("SELECT ");
if (null != cols) {
sb.append(getColumnListStr(cols));
} else {
sb.append("*");
}
sb.append(" FROM ");
sb.append(tableName);
sb.append(" WHERE ");
sb.append(whereClause);
sb.append(")");
} else {
// Import just the table.
sb.append(tableName);
if (null != cols) {
// specify columns.
sb.append("(");
sb.append(getColumnListStr(cols));
sb.append(")");
}
}
// Translate delimiter characters to '\ooo' octal representation.
sb.append(" TO STDOUT WITH DELIMITER E'\\");
sb.append(Integer.toString((int) this.options.getOutputFieldDelim(), 8));
sb.append("' CSV ");
if (this.options.getOutputEnclosedBy() != '\0') {
sb.append("QUOTE E'\\");
sb.append(Integer.toString((int) this.options.getOutputEnclosedBy(), 8));
sb.append("' ");
}
if (this.options.getOutputEscapedBy() != '\0') {
sb.append("ESCAPE E'\\");
sb.append(Integer.toString((int) this.options.getOutputEscapedBy(), 8));
sb.append("' ");
}
// add the "FORCE QUOTE col, col, col..." clause if quotes are required.
if (null != cols && this.options.isOutputEncloseRequired()) {
sb.append("FORCE QUOTE ");
sb.append(getColumnListStr(cols));
}
sb.append(";");
String copyCmd = sb.toString();
LOG.debug("Copy command is " + copyCmd);
return copyCmd;
}
/** Write the COPY command to a temp file
* @return the filename we wrote to.
*/
private String writeCopyCommand(String command) throws IOException {
String tmpDir = options.getTempDir();
File tempFile = File.createTempFile("tmp-",".sql", new File(tmpDir));
BufferedWriter w = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(tempFile)));
w.write(command);
w.newLine();
w.close();
return tempFile.toString();
}
/** Write the user's password to a file that is chmod 0600.
@return the filename.
*/
private String writePasswordFile(String password) throws IOException {
String tmpDir = options.getTempDir();
File tempFile = File.createTempFile("pgpass",".pgpass", new File(tmpDir));
LOG.debug("Writing password to tempfile: " + tempFile);
// Make sure it's only readable by the current user.
DirectImportUtils.setFilePermissions(tempFile, "0600");
// Actually write the password data into the file.
BufferedWriter w = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(tempFile)));
w.write("*:*:*:*:" + password);
w.close();
return tempFile.toString();
}
@Override
/**
* Import the table into HDFS by using psql to pull the data out of the db
* via COPY FILE TO STDOUT.
*/
public void importTable(ImportJobContext context)
throws IOException, ImportException {
String tableName = context.getTableName();
String jarFile = context.getJarFile();
SqoopOptions options = context.getOptions();
LOG.info("Beginning psql fast path import");
if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
// TODO(aaron): Support SequenceFile-based load-in
LOG.warn("File import layout" + options.getFileLayout()
+ " is not supported by");
LOG.warn("Postgresql direct import; import will proceed as text files.");
}
String commandFilename = null;
String passwordFilename = null;
Process p = null;
AsyncSink sink = null;
AsyncSink errSink = null;
PerfCounters counters = new PerfCounters();
try {
// Get the COPY TABLE command to issue, write this to a file, and pass it
// in to psql with -f filename.
// Then make sure we delete this file in our finally block.
String copyCmd = getCopyCommand(tableName);
commandFilename = writeCopyCommand(copyCmd);
// Arguments to pass to psql on the command line.
ArrayList<String> args = new ArrayList<String>();
// Environment to pass to psql.
List<String> envp = Executor.getCurEnvpStrings();
// We need to parse the connect string URI to determine the database name
// and the host and port. If the host is localhost and the port is not specified,
// we don't want to pass this to psql, because we want to force the use of a
// UNIX domain socket, not a TCP/IP socket.
String connectString = options.getConnectString();
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);
if (null == databaseName) {
throw new ImportException("Could not determine database name");
}
LOG.info("Performing import of table " + tableName + " from database " + databaseName);
args.add(PSQL_CMD); // requires that this is on the path.
args.add("--tuples-only");
args.add("--quiet");
String username = options.getUsername();
if (username != null) {
args.add("--username");
args.add(username);
String password = options.getPassword();
if (null != password) {
passwordFilename = writePasswordFile(password);
// Need to send PGPASSFILE environment variable specifying
// location of our postgres file.
envp.add("PGPASSFILE=" + passwordFilename);
}
}
if (!DirectImportUtils.isLocalhost(hostname) || port != -1) {
args.add("--host");
args.add(hostname);
args.add("--port");
args.add(Integer.toString(port));
}
if (null != databaseName && databaseName.length() > 0) {
args.add(databaseName);
}
// The COPY command is in a script file.
args.add("-f");
args.add(commandFilename);
// begin the import in an external process.
LOG.debug("Starting psql with arguments:");
for (String arg : args) {
LOG.debug(" " + arg);
}
// This writer will be closed by AsyncSink.
SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
options.getConf(), options, tableName);
// Actually start the psql dump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]),
envp.toArray(new String[0]));
// read from the stdout pipe into the HDFS writer.
InputStream is = p.getInputStream();
sink = new PostgresqlAsyncSink(w, options, counters);
LOG.debug("Starting stream sink");
counters.startClock();
sink.processStream(is);
errSink = new LoggingAsyncSink(LOG);
errSink.processStream(p.getErrorStream());
} finally {
// block until the process is done.
LOG.debug("Waiting for process completion");
int result = 0;
if (null != p) {
while (true) {
try {
result = p.waitFor();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
}
break;
}
}
// Remove any password file we wrote
if (null != passwordFilename) {
if (!new File(passwordFilename).delete()) {
LOG.error("Could not remove postgresql password file " + passwordFilename);
LOG.error("You should remove this file to protect your credentials.");
}
}
if (null != commandFilename) {
// We wrote the COPY comand to a tmpfile. Remove it.
if (!new File(commandFilename).delete()) {
LOG.info("Could not remove temp file: " + commandFilename);
}
}
// block until the stream sink is done too.
int streamResult = 0;
if (null != sink) {
while (true) {
try {
streamResult = sink.join();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
}
break;
}
}
// Attempt to block for stderr stream sink; errors are advisory.
if (null != errSink) {
try {
if (0 != errSink.join()) {
LOG.info("Encountered exception reading stderr stream");
}
} catch (InterruptedException ie) {
LOG.info("Thread interrupted waiting for stderr to complete: "
+ ie.toString());
}
}
LOG.info("Transfer loop complete.");
if (0 != result) {
throw new IOException("psql terminated with status "
+ Integer.toString(result));
}
if (0 != streamResult) {
throw new IOException("Encountered exception in stream sink");
}
counters.stopClock();
LOG.info("Transferred " + counters.toString());
}
}
}