blob: 790dad4d90b37f4433bc0e11cc5686f149ea5a06 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sqoop.util.AsyncSink;
import org.apache.sqoop.util.JdbcUrl;
import org.apache.sqoop.util.PerfCounters;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
import org.apache.sqoop.lib.DelimiterSet;
import org.apache.sqoop.lib.FieldFormatter;
import org.apache.sqoop.lib.RecordParser;
import org.apache.sqoop.manager.MySQLUtils;
import org.apache.sqoop.util.ErrorableAsyncSink;
import org.apache.sqoop.util.ErrorableThread;
import org.apache.sqoop.util.LoggingAsyncSink;
/**
* Mapper that opens up a pipe to mysqldump and pulls data directly.
*/
public class MySQLDumpMapper
extends SqoopMapper<String, NullWritable, String, NullWritable> {
public static final Log LOG = LogFactory.getLog(
MySQLDumpMapper.class.getName());
private Configuration conf;
// AsyncSinks used to import data from mysqldump directly into HDFS.
/**
* Copies data directly from mysqldump into HDFS, after stripping some
* header and footer characters that are attached to each line in mysqldump.
*/
public static class CopyingAsyncSink extends ErrorableAsyncSink {
private final MySQLDumpMapper.Context context;
private final PerfCounters counters;
protected CopyingAsyncSink(final MySQLDumpMapper.Context context,
final PerfCounters ctrs) {
this.context = context;
this.counters = ctrs;
}
public void processStream(InputStream is) {
child = new CopyingStreamThread(is, context, counters);
child.start();
}
private static class CopyingStreamThread extends ErrorableThread {
public static final Log LOG = LogFactory.getLog(
CopyingStreamThread.class.getName());
private final MySQLDumpMapper.Context context;
private final InputStream stream;
private final PerfCounters counters;
CopyingStreamThread(final InputStream is,
final Context c, final PerfCounters ctrs) {
this.context = c;
this.stream = is;
this.counters = ctrs;
}
public void run() {
BufferedReader r = null;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
// Actually do the read/write transfer loop here.
int preambleLen = -1; // set to this for "undefined"
while (true) {
String inLine = r.readLine();
if (null == inLine) {
break; // EOF.
}
if (inLine.trim().length() == 0 || inLine.startsWith("--")) {
continue; // comments and empty lines are ignored
}
// this line is of the form "INSERT .. VALUES ( actual value text
// );" strip the leading preamble up to the '(' and the trailing
// ');'.
if (preambleLen == -1) {
// we haven't determined how long the preamble is. It's constant
// across all lines, so just figure this out once.
String recordStartMark = "VALUES (";
preambleLen = inLine.indexOf(recordStartMark)
+ recordStartMark.length();
}
// chop off the leading and trailing text as we write the
// output to HDFS.
int len = inLine.length() - 2 - preambleLen;
context.write(inLine.substring(preambleLen, inLine.length() - 2)
+ "\n", null);
counters.addBytes(1 + len);
}
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so we get an error status back in the caller.
setError();
} catch (InterruptedException ie) {
LOG.error("InterruptedException reading from mysqldump: "
+ ie.toString());
// flag this error so we get an error status back in the caller.
setError();
} finally {
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.info("Error closing FIFO stream: " + ioe.toString());
}
}
}
}
}
}
/**
* The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
* output, and re-emit the text in the user's specified output format.
*/
public static class ReparsingAsyncSink extends ErrorableAsyncSink {
private final MySQLDumpMapper.Context context;
private final Configuration conf;
private final PerfCounters counters;
protected ReparsingAsyncSink(final MySQLDumpMapper.Context c,
final Configuration conf, final PerfCounters ctrs) {
this.context = c;
this.conf = conf;
this.counters = ctrs;
}
public void processStream(InputStream is) {
child = new ReparsingStreamThread(is, context, conf, counters);
child.start();
}
private static class ReparsingStreamThread extends ErrorableThread {
public static final Log LOG = LogFactory.getLog(
ReparsingStreamThread.class.getName());
private final MySQLDumpMapper.Context context;
private final Configuration conf;
private final InputStream stream;
private final PerfCounters counters;
ReparsingStreamThread(final InputStream is,
final MySQLDumpMapper.Context c, Configuration conf,
final PerfCounters ctrs) {
this.context = c;
this.conf = conf;
this.stream = is;
this.counters = ctrs;
}
private static final char MYSQL_FIELD_DELIM = ',';
private static final char MYSQL_RECORD_DELIM = '\n';
private static final char MYSQL_ENCLOSE_CHAR = '\'';
private static final char MYSQL_ESCAPE_CHAR = '\\';
private static final boolean MYSQL_ENCLOSE_REQUIRED = false;
private static final RecordParser MYSQLDUMP_PARSER;
static {
// build a record parser for mysqldump's format
MYSQLDUMP_PARSER = new RecordParser(DelimiterSet.MYSQL_DELIMITERS);
}
public void run() {
BufferedReader r = null;
try {
r = new BufferedReader(new InputStreamReader(this.stream));
// Configure the output with the user's delimiters.
char outputFieldDelim = (char) conf.getInt(
MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
DelimiterSet.NULL_CHAR);
String outputFieldDelimStr = "" + outputFieldDelim;
char outputRecordDelim = (char) conf.getInt(
MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
DelimiterSet.NULL_CHAR);
String outputRecordDelimStr = "" + outputRecordDelim;
char outputEnclose = (char) conf.getInt(
MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
DelimiterSet.NULL_CHAR);
char outputEscape = (char) conf.getInt(
MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
DelimiterSet.NULL_CHAR);
boolean outputEncloseRequired = conf.getBoolean(
MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);
DelimiterSet delimiters = new DelimiterSet(
outputFieldDelim,
outputRecordDelim,
outputEnclose,
outputEscape,
outputEncloseRequired);
// Actually do the read/write transfer loop here.
int preambleLen = -1; // set to this for "undefined"
while (true) {
String inLine = r.readLine();
if (null == inLine) {
break; // EOF.
}
if (inLine.trim().length() == 0 || inLine.startsWith("--")) {
continue; // comments and empty lines are ignored
}
// this line is of the form "INSERT .. VALUES ( actual value text
// );" strip the leading preamble up to the '(' and the trailing
// ');'.
if (preambleLen == -1) {
// we haven't determined how long the preamble is. It's constant
// across all lines, so just figure this out once.
String recordStartMark = "VALUES (";
preambleLen = inLine.indexOf(recordStartMark)
+ recordStartMark.length();
}
// Wrap the input string in a char buffer that ignores the leading
// and trailing text.
CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen,
inLine.length() - 2);
// Pass this along to the parser
List<String> fields = null;
try {
fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
} catch (RecordParser.ParseError pe) {
LOG.warn("ParseError reading from mysqldump: "
+ pe.toString() + "; record skipped");
continue; // Skip emitting this row.
}
// For all of the output fields, emit them using the delimiters
// the user chooses.
boolean first = true;
StringBuilder sb = new StringBuilder();
int recordLen = 1; // for the delimiter.
for (String field : fields) {
if (!first) {
sb.append(outputFieldDelimStr);
} else {
first = false;
}
String fieldStr = FieldFormatter.escapeAndEnclose(field,
delimiters);
sb.append(fieldStr);
recordLen += fieldStr.length();
}
sb.append(outputRecordDelimStr);
context.write(sb.toString(), null);
counters.addBytes(recordLen);
}
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so the parent can handle it appropriately.
setError();
} catch (InterruptedException ie) {
LOG.error("InterruptedException reading from mysqldump: "
+ ie.toString());
// flag this error so we get an error status back in the caller.
setError();
} finally {
if (null != r) {
try {
r.close();
} catch (IOException ioe) {
LOG.info("Error closing FIFO stream: " + ioe.toString());
}
}
}
}
}
}
// TODO(aaron): Refactor this method to be much shorter.
// CHECKSTYLE:OFF
/**
* Import the table into HDFS by using mysqldump to pull out the data from
* the database and upload the files directly to HDFS.
*/
public void map(String splitConditions, NullWritable val, Context context)
throws IOException, InterruptedException {
LOG.info("Beginning mysqldump fast path import");
ArrayList<String> args = new ArrayList<String>();
String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY);
// We need to parse the connect string URI to determine the database name.
// Using java.net.URL directly on the connect string will fail because
// Java doesn't respect arbitrary JDBC-based schemes. So we chop off the
// scheme (everything before '://') and replace it with 'http', which we
// know will work.
String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
String databaseName = JdbcUrl.getDatabaseName(connectString);
String hostname = JdbcUrl.getHostName(connectString);
int port = JdbcUrl.getPort(connectString);
if (null == databaseName) {
throw new IOException("Could not determine database name");
}
LOG.info("Performing import of table " + tableName + " from database "
+ databaseName);
args.add(MySQLUtils.MYSQL_DUMP_CMD); // requires that this is on the path.
String password = DBConfiguration.getPassword((JobConf) conf);
String passwordFile = null;
Process p = null;
AsyncSink sink = null;
AsyncSink errSink = null;
PerfCounters counters = new PerfCounters();
try {
// --defaults-file must be the first argument.
if (null != password && password.length() > 0) {
passwordFile = MySQLUtils.writePasswordFile(conf);
args.add("--defaults-file=" + passwordFile);
}
// Don't use the --where="<whereClause>" version because spaces in it can
// confuse Java, and adding in surrounding quotes confuses Java as well.
String whereClause = conf.get(MySQLUtils.WHERE_CLAUSE_KEY, "(1=1)")
+ " AND (" + splitConditions + ")";
args.add("-w");
args.add(whereClause);
args.add("--host=" + hostname);
if (-1 != port) {
args.add("--port=" + Integer.toString(port));
}
args.add("--skip-opt");
args.add("--compact");
args.add("--no-create-db");
args.add("--no-create-info");
args.add("--quick"); // no buffering
args.add("--single-transaction");
String username = conf.get(MySQLUtils.USERNAME_KEY);
if (null != username) {
args.add("--user=" + username);
}
// If the user supplied extra args, add them here.
String [] extra = conf.getStrings(MySQLUtils.EXTRA_ARGS_KEY);
if (null != extra) {
for (String arg : extra) {
args.add(arg);
}
}
args.add(databaseName);
args.add(tableName);
// begin the import in an external process.
LOG.debug("Starting mysqldump with arguments:");
for (String arg : args) {
LOG.debug(" " + arg);
}
// Actually start the mysqldump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]));
// read from the stdout pipe into the HDFS writer.
InputStream is = p.getInputStream();
if (MySQLUtils.outputDelimsAreMySQL(conf)) {
LOG.debug("Output delimiters conform to mysqldump; "
+ "using straight copy");
sink = new CopyingAsyncSink(context, counters);
} else {
LOG.debug("User-specified delimiters; using reparsing import");
LOG.info("Converting data to use specified delimiters.");
LOG.info("(For the fastest possible import, use");
LOG.info("--mysql-delimiters to specify the same field");
LOG.info("delimiters as are used by mysqldump.)");
sink = new ReparsingAsyncSink(context, conf, counters);
}
// Start an async thread to read and upload the whole stream.
counters.startClock();
sink.processStream(is);
// Start an async thread to send stderr to log4j.
errSink = new LoggingAsyncSink(LOG);
errSink.processStream(p.getErrorStream());
} finally {
// block until the process is done.
int result = 0;
if (null != p) {
while (true) {
try {
result = p.waitFor();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
}
break;
}
}
// Remove the password file.
if (null != passwordFile) {
if (!new File(passwordFile).delete()) {
LOG.error("Could not remove mysql password file " + passwordFile);
LOG.error("You should remove this file to protect your credentials.");
}
}
// block until the stream sink is done too.
int streamResult = 0;
if (null != sink) {
while (true) {
try {
streamResult = sink.join();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
}
break;
}
}
// Try to wait for stderr to finish, but regard any errors as advisory.
if (null != errSink) {
try {
if (0 != errSink.join()) {
LOG.info("Encountered exception reading stderr stream");
}
} catch (InterruptedException ie) {
LOG.info("Thread interrupted waiting for stderr to complete: "
+ ie.toString());
}
}
LOG.info("Transfer loop complete.");
if (0 != result) {
throw new IOException("mysqldump terminated with status "
+ Integer.toString(result));
}
if (0 != streamResult) {
throw new IOException("Encountered exception in stream sink");
}
counters.stopClock();
LOG.info("Transferred " + counters.toString());
}
}
// CHECKSTYLE:ON
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
this.conf = context.getConfiguration();
}
}