blob: 0180c062decb714d46bb7d4c51fccfddc66d0446 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.sqoop.manager;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.SqoopOptions;
import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.lib.FieldFormatter;
import org.apache.hadoop.sqoop.lib.RecordParser;
import org.apache.hadoop.sqoop.util.AsyncSink;
import org.apache.hadoop.sqoop.util.DirectImportUtils;
import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
import org.apache.hadoop.sqoop.util.ErrorableThread;
import org.apache.hadoop.sqoop.util.ImportException;
import org.apache.hadoop.sqoop.util.JdbcUrl;
import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
import org.apache.hadoop.sqoop.util.PerfCounters;
/**
* Manages direct connections to MySQL databases
* so we can use mysqldump to get really fast dumps.
*/
public class LocalMySQLManager extends MySQLManager {
public static final Log LOG = LogFactory.getLog(LocalMySQLManager.class.getName());
// AsyncSinks used to import data from mysqldump directly into HDFS.
/**
* Copies data directly from mysqldump into HDFS, after stripping some
* header and footer characters that are attached to each line in mysqldump.
*/
static class CopyingAsyncSink extends ErrorableAsyncSink {
private final SplittableBufferedWriter writer;
private final PerfCounters counters;
CopyingAsyncSink(final SplittableBufferedWriter w,
final PerfCounters ctrs) {
this.writer = w;
this.counters = ctrs;
}
public void processStream(InputStream is) {
child = new CopyingStreamThread(is, writer, counters);
child.start();
}
private static class CopyingStreamThread extends ErrorableThread {
public static final Log LOG = LogFactory.getLog(
CopyingStreamThread.class.getName());
private final SplittableBufferedWriter writer;
private final InputStream stream;
private final PerfCounters counters;
CopyingStreamThread(final InputStream is,
final SplittableBufferedWriter w, final PerfCounters ctrs) {
this.writer = w;
this.stream = is;
this.counters = ctrs;
}
public void run() {
BufferedReader r = null;
SplittableBufferedWriter w = this.writer;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
// Actually do the read/write transfer loop here.
int preambleLen = -1; // set to this for "undefined"
while (true) {
String inLine = r.readLine();
if (null == inLine) {
break; // EOF.
}
// this line is of the form "INSERT .. VALUES ( actual value text );"
// strip the leading preamble up to the '(' and the trailing ');'.
if (preambleLen == -1) {
// we haven't determined how long the preamble is. It's constant
// across all lines, so just figure this out once.
String recordStartMark = "VALUES (";
preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length();
}
// chop off the leading and trailing text as we write the
// output to HDFS.
int len = inLine.length() - 2 - preambleLen;
w.write(inLine, preambleLen, len);
w.newLine();
counters.addBytes(1 + len);
}
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so we get an error status back in the caller.
setError();
} finally {
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.info("Error closing FIFO stream: " + ioe.toString());
}
}
if (null != w) {
try {
w.close();
} catch (IOException ioe) {
LOG.info("Error closing HDFS stream: " + ioe.toString());
}
}
}
}
}
}
/**
* The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
* output, and re-emit the text in the user's specified output format.
*/
static class ReparsingAsyncSink extends ErrorableAsyncSink {
private final SplittableBufferedWriter writer;
private final SqoopOptions options;
private final PerfCounters counters;
ReparsingAsyncSink(final SplittableBufferedWriter w,
final SqoopOptions opts, final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.counters = ctrs;
}
public void processStream(InputStream is) {
child = new ReparsingStreamThread(is, writer, options, counters);
child.start();
}
private static class ReparsingStreamThread extends ErrorableThread {
public static final Log LOG = LogFactory.getLog(
ReparsingStreamThread.class.getName());
private final SplittableBufferedWriter writer;
private final SqoopOptions options;
private final InputStream stream;
private final PerfCounters counters;
ReparsingStreamThread(final InputStream is,
final SplittableBufferedWriter w, final SqoopOptions opts,
final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.stream = is;
this.counters = ctrs;
}
private static final char MYSQL_FIELD_DELIM = ',';
private static final char MYSQL_RECORD_DELIM = '\n';
private static final char MYSQL_ENCLOSE_CHAR = '\'';
private static final char MYSQL_ESCAPE_CHAR = '\\';
private static final boolean MYSQL_ENCLOSE_REQUIRED = false;
private static final RecordParser MYSQLDUMP_PARSER;
static {
// build a record parser for mysqldump's format
MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM,
MYSQL_RECORD_DELIM, MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR,
MYSQL_ENCLOSE_REQUIRED);
}
public void run() {
BufferedReader r = null;
SplittableBufferedWriter w = this.writer;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
char outputFieldDelim = options.getOutputFieldDelim();
char outputRecordDelim = options.getOutputRecordDelim();
String outputEnclose = "" + options.getOutputEnclosedBy();
String outputEscape = "" + options.getOutputEscapedBy();
boolean outputEncloseRequired = options.isOutputEncloseRequired();
char [] encloseFor = { outputFieldDelim, outputRecordDelim };
// Actually do the read/write transfer loop here.
int preambleLen = -1; // set to this for "undefined"
while (true) {
String inLine = r.readLine();
if (null == inLine) {
break; // EOF.
}
// this line is of the form "INSERT .. VALUES ( actual value text );"
// strip the leading preamble up to the '(' and the trailing ');'.
if (preambleLen == -1) {
// we haven't determined how long the preamble is. It's constant
// across all lines, so just figure this out once.
String recordStartMark = "VALUES (";
preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length();
}
// Wrap the input string in a char buffer that ignores the leading and trailing
// text.
CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen, inLine.length() - 2);
// Pass this along to the parser
List<String> fields = null;
try {
fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
} catch (RecordParser.ParseError pe) {
LOG.warn("ParseError reading from mysqldump: " + pe.toString() + "; record skipped");
}
// For all of the output fields, emit them using the delimiters the user chooses.
boolean first = true;
int recordLen = 1; // for the delimiter.
for (String field : fields) {
if (!first) {
w.write(outputFieldDelim);
} else {
first = false;
}
String fieldStr = FieldFormatter.escapeAndEnclose(field, outputEscape, outputEnclose,
encloseFor, outputEncloseRequired);
w.write(fieldStr);
recordLen += fieldStr.length();
}
w.write(outputRecordDelim);
w.allowSplit();
counters.addBytes(recordLen);
}
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so the parent can handle it appropriately.
setError();
} finally {
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.info("Error closing FIFO stream: " + ioe.toString());
}
}
if (null != w) {
try {
w.close();
} catch (IOException ioe) {
LOG.info("Error closing HDFS stream: " + ioe.toString());
}
}
}
}
}
}
public LocalMySQLManager(final SqoopOptions options) {
super(options, false);
}
private static final String MYSQL_DUMP_CMD = "mysqldump";
/**
* @return true if the user's output delimiters match those used by mysqldump.
* fields: ,
* lines: \n
* optional-enclose: \'
* escape: \\
*/
private boolean outputDelimsAreMySQL() {
return options.getOutputFieldDelim() == ','
&& options.getOutputRecordDelim() == '\n'
&& options.getOutputEnclosedBy() == '\''
&& options.getOutputEscapedBy() == '\\'
&& !options.isOutputEncloseRequired(); // encloser is optional
}
/**
* Writes the user's password to a tmp file with 0600 permissions.
* @return the filename used.
*/
private String writePasswordFile() throws IOException {
// Create the temp file to hold the user's password.
String tmpDir = options.getTempDir();
File tempFile = File.createTempFile("mysql-cnf",".cnf", new File(tmpDir));
// Make the password file only private readable.
DirectImportUtils.setFilePermissions(tempFile, "0600");
// If we're here, the password file is believed to be ours alone.
// The inability to set chmod 0600 inside Java is troublesome. We have to trust
// that the external 'chmod' program in the path does the right thing, and returns
// the correct exit status. But given our inability to re-read the permissions
// associated with a file, we'll have to make do with this.
String password = options.getPassword();
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(tempFile)));
w.write("[client]\n");
w.write("password=" + password + "\n");
w.close();
return tempFile.toString();
}
/**
* Import the table into HDFS by using mysqldump to pull out the data from
* the database and upload the files directly to HDFS.
*/
public void importTable(ImportJobContext context)
throws IOException, ImportException {
String tableName = context.getTableName();
String jarFile = context.getJarFile();
SqoopOptions options = context.getOptions();
LOG.info("Beginning mysqldump fast path import");
if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
// TODO(aaron): Support SequenceFile-based load-in
LOG.warn("File import layout " + options.getFileLayout()
+ " is not supported by");
LOG.warn("MySQL direct import; import will proceed as text files.");
}
ArrayList<String> args = new ArrayList<String>();
// We need to parse the connect string URI to determine the database
// name. Using java.net.URL directly on the connect string will fail because
// Java doesn't respect arbitrary JDBC-based schemes. So we chop off the scheme
// (everything before '://') and replace it with 'http', which we know will work.
String connectString = options.getConnectString();
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);
if (null == databaseName) {
throw new ImportException("Could not determine database name");
}
LOG.info("Performing import of table " + tableName + " from database " + databaseName);
args.add(MYSQL_DUMP_CMD); // requires that this is on the path.
String password = options.getPassword();
String passwordFile = null;
Process p = null;
AsyncSink sink = null;
AsyncSink errSink = null;
PerfCounters counters = new PerfCounters();
try {
// --defaults-file must be the first argument.
if (null != password && password.length() > 0) {
passwordFile = writePasswordFile();
args.add("--defaults-file=" + passwordFile);
}
String whereClause = options.getWhereClause();
if (null != whereClause) {
// Don't use the --where="<whereClause>" version because spaces in it can confuse
// Java, and adding in surrounding quotes confuses Java as well.
args.add("-w");
args.add(whereClause);
}
if (!DirectImportUtils.isLocalhost(hostname) || port != -1) {
args.add("--host=" + hostname);
args.add("--port=" + Integer.toString(port));
}
args.add("--skip-opt");
args.add("--compact");
args.add("--no-create-db");
args.add("--no-create-info");
args.add("--quick"); // no buffering
args.add("--single-transaction");
String username = options.getUsername();
if (null != username) {
args.add("--user=" + username);
}
// If the user supplied extra args, add them here.
String [] extra = options.getExtraArgs();
if (null != extra) {
for (String arg : extra) {
args.add(arg);
}
}
args.add(databaseName);
args.add(tableName);
// begin the import in an external process.
LOG.debug("Starting mysqldump with arguments:");
for (String arg : args) {
LOG.debug(" " + arg);
}
// This writer will be closed by AsyncSink.
SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
options.getConf(), options, tableName);
// Actually start the mysqldump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]));
// read from the stdout pipe into the HDFS writer.
InputStream is = p.getInputStream();
if (outputDelimsAreMySQL()) {
LOG.debug("Output delimiters conform to mysqldump; using straight copy");
sink = new CopyingAsyncSink(w, counters);
} else {
LOG.debug("User-specified delimiters; using reparsing import");
LOG.info("Converting data to use specified delimiters.");
LOG.info("(For the fastest possible import, use");
LOG.info("--mysql-delimiters to specify the same field");
LOG.info("delimiters as are used by mysqldump.)");
sink = new ReparsingAsyncSink(w, options, counters);
}
// Start an async thread to read and upload the whole stream.
counters.startClock();
sink.processStream(is);
// Start an async thread to send stderr to log4j.
errSink = new LoggingAsyncSink(LOG);
errSink.processStream(p.getErrorStream());
} finally {
// block until the process is done.
int result = 0;
if (null != p) {
while (true) {
try {
result = p.waitFor();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
}
break;
}
}
// Remove the password file.
if (null != passwordFile) {
if (!new File(passwordFile).delete()) {
LOG.error("Could not remove mysql password file " + passwordFile);
LOG.error("You should remove this file to protect your credentials.");
}
}
// block until the stream sink is done too.
int streamResult = 0;
if (null != sink) {
while (true) {
try {
streamResult = sink.join();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
}
break;
}
}
// Try to wait for stderr to finish, but regard any errors as advisory.
if (null != errSink) {
try {
if (0 != errSink.join()) {
LOG.info("Encountered exception reading stderr stream");
}
} catch (InterruptedException ie) {
LOG.info("Thread interrupted waiting for stderr to complete: "
+ ie.toString());
}
}
LOG.info("Transfer loop complete.");
if (0 != result) {
throw new IOException("mysqldump terminated with status "
+ Integer.toString(result));
}
if (0 != streamResult) {
throw new IOException("Encountered exception in stream sink");
}
counters.stopClock();
LOG.info("Transferred " + counters.toString());
}
}
}