| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.sqoop.mapreduce; |
| |
| import java.io.BufferedOutputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.util.ArrayList; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.NullWritable; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.sqoop.util.AsyncSink; |
| import org.apache.sqoop.util.JdbcUrl; |
| import org.apache.sqoop.util.LoggingAsyncSink; |
| import org.apache.sqoop.util.NullAsyncSink; |
| import org.apache.sqoop.util.TaskId; |
| import org.apache.sqoop.io.NamedFifo; |
| import org.apache.sqoop.mapreduce.db.DBConfiguration; |
| import org.apache.sqoop.manager.MySQLUtils; |
| |
| /** |
| * Mapper that starts a 'mysqlimport' process and uses that to export rows from |
| * HDFS to a MySQL database at high speed. |
| * |
| * map() methods are actually provided by subclasses that read from |
| * SequenceFiles (containing existing SqoopRecords) or text files |
| * (containing delimited lines) and deliver these results to the fifo |
| * used to interface with mysqlimport. |
| */ |
| public class MySQLExportMapper<KEYIN, VALIN> |
| extends SqoopMapper<KEYIN, VALIN, NullWritable, NullWritable> { |
| |
| public static final Log LOG = LogFactory.getLog( |
| MySQLExportMapper.class.getName()); |
| |
| /** Configuration key that specifies the number of bytes before which it |
| * commits the current export transaction and opens a new one. |
| * Default is 32 MB; setting this to 0 will use no checkpoints. |
| */ |
| public static final String MYSQL_CHECKPOINT_BYTES_KEY = |
| "sqoop.mysql.export.checkpoint.bytes"; |
| |
| public static final long DEFAULT_CHECKPOINT_BYTES = 32 * 1024 * 1024; |
| |
| // Configured value for MSYQL_CHECKPOINT_BYTES_KEY. |
| protected long checkpointDistInBytes; |
| |
| /** Configuration key that specifies the number of milliseconds |
| * to sleep at the end of each checkpoint commit |
| * Default is 0, no sleep. |
| */ |
| public static final String MYSQL_CHECKPOINT_SLEEP_KEY = |
| "sqoop.mysql.export.sleep.ms"; |
| |
| public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0; |
| |
| // Configured value for MYSQL_CHECKPOINT_SLEEP_KEY. |
| protected long checkpointSleepMs; |
| |
| protected Configuration conf; |
| |
| /** The FIFO being used to communicate with mysqlimport. */ |
| protected File fifoFile; |
| |
| /** The process object representing the active connection to mysqlimport. */ |
| protected Process mysqlImportProcess; |
| |
| /** The stream to write to stdin for mysqlimport. */ |
| protected OutputStream importStream; |
| |
| // Handlers for stdout and stderr from mysqlimport. |
| protected AsyncSink outSink; |
| protected AsyncSink errSink; |
| |
| /** File object where we wrote the user's password to pass to mysqlimport. */ |
| protected File passwordFile; |
| |
| /** Character set used to write to mysqlimport. */ |
| protected String mysqlCharSet; |
| |
| /** |
| * Tally of bytes written to current mysqlimport instance. |
| * We commit an interim tx and open a new mysqlimport after this |
| * gets too big. */ |
| private long bytesWritten; |
| |
| /** |
| * Create a named FIFO, and start mysqlimport connected to that FIFO. |
| * A File object representing the FIFO is in 'fifoFile'. |
| */ |
| private void initMySQLImportProcess() throws IOException { |
| File taskAttemptDir = TaskId.getLocalWorkPath(conf); |
| |
| this.fifoFile = new File(taskAttemptDir, |
| conf.get(MySQLUtils.TABLE_NAME_KEY, "UNKNOWN_TABLE") + ".txt"); |
| String filename = fifoFile.toString(); |
| |
| // Create the FIFO itself. |
| try { |
| new NamedFifo(this.fifoFile).create(); |
| } catch (IOException ioe) { |
| // Command failed. |
| LOG.error("Could not mknod " + filename); |
| this.fifoFile = null; |
| throw new IOException( |
| "Could not create FIFO to interface with mysqlimport", ioe); |
| } |
| |
| // Now open the connection to mysqlimport. |
| ArrayList<String> args = new ArrayList<String>(); |
| |
| String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY); |
| String databaseName = JdbcUrl.getDatabaseName(connectString); |
| String hostname = JdbcUrl.getHostName(connectString); |
| int port = JdbcUrl.getPort(connectString); |
| |
| if (null == databaseName) { |
| throw new IOException("Could not determine database name"); |
| } |
| |
| args.add(MySQLUtils.MYSQL_IMPORT_CMD); // needs to be on the path. |
| String password = DBConfiguration.getPassword((JobConf) conf); |
| |
| if (null != password && password.length() > 0) { |
| passwordFile = new File(MySQLUtils.writePasswordFile(conf)); |
| args.add("--defaults-file=" + passwordFile); |
| } |
| |
| String username = conf.get(MySQLUtils.USERNAME_KEY); |
| if (null != username) { |
| args.add("--user=" + username); |
| } |
| |
| args.add("--host=" + hostname); |
| if (-1 != port) { |
| args.add("--port=" + Integer.toString(port)); |
| } |
| |
| args.add("--compress"); |
| args.add("--local"); |
| args.add("--silent"); |
| |
| // Specify the subset of columns we're importing. |
| DBConfiguration dbConf = new DBConfiguration(conf); |
| String [] cols = dbConf.getInputFieldNames(); |
| if (null != cols) { |
| StringBuilder sb = new StringBuilder(); |
| boolean first = true; |
| for (String col : cols) { |
| if (!first) { |
| sb.append(","); |
| } |
| sb.append(col); |
| first = false; |
| } |
| |
| args.add("--columns=" + sb.toString()); |
| } |
| |
| // Specify the delimiters to use. |
| int outputFieldDelim = conf.getInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY, |
| (int) ','); |
| int outputRecordDelim = conf.getInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY, |
| (int) '\n'); |
| int enclosedBy = conf.getInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY, 0); |
| int escapedBy = conf.getInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY, 0); |
| boolean encloseRequired = conf.getBoolean( |
| MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false); |
| |
| args.add("--fields-terminated-by=0x" |
| + Integer.toString(outputFieldDelim, 16)); |
| args.add("--lines-terminated-by=0x" |
| + Integer.toString(outputRecordDelim, 16)); |
| if (0 != enclosedBy) { |
| if (encloseRequired) { |
| args.add("--fields-enclosed-by=0x" + Integer.toString(enclosedBy, 16)); |
| } else { |
| args.add("--fields-optionally-enclosed-by=0x" |
| + Integer.toString(enclosedBy, 16)); |
| } |
| } |
| |
| if (0 != escapedBy) { |
| args.add("--fields-escaped-by=0x" + Integer.toString(escapedBy, 16)); |
| } |
| |
| // These two arguments are positional and must be last. |
| args.add(databaseName); |
| args.add(filename); |
| |
| // Begin the export in an external process. |
| LOG.debug("Starting mysqlimport with arguments:"); |
| for (String arg : args) { |
| LOG.debug(" " + arg); |
| } |
| |
| // Actually start mysqlimport. |
| mysqlImportProcess = Runtime.getRuntime().exec(args.toArray(new String[0])); |
| |
| // Log everything it writes to stderr. |
| // Ignore anything on stdout. |
| this.outSink = new NullAsyncSink(); |
| this.outSink.processStream(mysqlImportProcess.getInputStream()); |
| |
| this.errSink = new LoggingAsyncSink(LOG); |
| this.errSink.processStream(mysqlImportProcess.getErrorStream()); |
| |
| // Open the named FIFO after starting mysqlimport. |
| this.importStream = new BufferedOutputStream( |
| new FileOutputStream(fifoFile)); |
| |
| // At this point, mysqlimport is running and hooked up to our FIFO. |
| // The mapper just needs to populate it with data. |
| |
| this.bytesWritten = 0; |
| } |
| |
| @Override |
| public void run(Context context) throws IOException, InterruptedException { |
| this.conf = context.getConfiguration(); |
| setup(context); |
| initMySQLImportProcess(); |
| try { |
| while (context.nextKeyValue()) { |
| map(context.getCurrentKey(), context.getCurrentValue(), context); |
| } |
| cleanup(context); |
| } finally { |
| // Shut down the mysqlimport process. |
| closeExportHandles(); |
| } |
| } |
| |
| private void closeExportHandles() throws IOException, InterruptedException { |
| int ret = 0; |
| if (null != this.importStream) { |
| // Close the stream that writes to mysqlimport's stdin first. |
| LOG.debug("Closing import stream"); |
| this.importStream.close(); |
| this.importStream = null; |
| } |
| |
| if (null != this.mysqlImportProcess) { |
| // We started mysqlimport; wait for it to finish. |
| LOG.info("Waiting for mysqlimport to complete"); |
| ret = this.mysqlImportProcess.waitFor(); |
| LOG.info("mysqlimport closed connection"); |
| this.mysqlImportProcess = null; |
| } |
| |
| if (null != this.passwordFile && this.passwordFile.exists()) { |
| if (!this.passwordFile.delete()) { |
| LOG.error("Could not remove mysql password file " + passwordFile); |
| LOG.error("You should remove this file to protect your credentials."); |
| } |
| |
| this.passwordFile = null; |
| } |
| |
| // Finish processing any output from mysqlimport. |
| // This is informational only, so we don't care about return codes. |
| if (null != outSink) { |
| LOG.debug("Waiting for any additional stdout from mysqlimport"); |
| outSink.join(); |
| outSink = null; |
| } |
| |
| if (null != errSink) { |
| LOG.debug("Waiting for any additional stderr from mysqlimport"); |
| errSink.join(); |
| errSink = null; |
| } |
| |
| if (this.fifoFile != null && this.fifoFile.exists()) { |
| // Clean up the resources we created. |
| LOG.debug("Removing fifo file"); |
| if (!this.fifoFile.delete()) { |
| LOG.error("Could not clean up named FIFO after completing mapper"); |
| } |
| |
| // We put the FIFO file in a one-off subdir. Remove that. |
| File fifoParentDir = this.fifoFile.getParentFile(); |
| LOG.debug("Removing task attempt tmpdir"); |
| if (!fifoParentDir.delete()) { |
| LOG.error("Could not clean up task dir after completing mapper"); |
| } |
| |
| this.fifoFile = null; |
| } |
| |
| if (0 != ret) { |
| // Don't mark the task as successful if mysqlimport returns an error. |
| throw new IOException("mysqlimport terminated with error code " + ret); |
| } |
| } |
| |
| @Override |
| protected void setup(Context context) { |
| this.conf = context.getConfiguration(); |
| |
| // TODO: Support additional encodings. |
| this.mysqlCharSet = MySQLUtils.MYSQL_DEFAULT_CHARSET; |
| |
| this.checkpointDistInBytes = conf.getLong( |
| MYSQL_CHECKPOINT_BYTES_KEY, DEFAULT_CHECKPOINT_BYTES); |
| if (this.checkpointDistInBytes < 0) { |
| LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY); |
| this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES; |
| } |
| |
| this.checkpointSleepMs = conf.getLong( |
| MYSQL_CHECKPOINT_SLEEP_KEY, DEFAULT_CHECKPOINT_SLEEP_MS); |
| |
| if (this.checkpointSleepMs < 0) { |
| LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_SLEEP_KEY); |
| this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS; |
| } |
| |
| if (this.checkpointSleepMs >= conf.getLong("mapred.task.timeout", 0)) { |
| LOG.warn("Value for " |
| + MYSQL_CHECKPOINT_SLEEP_KEY |
| + " has to be smaller than mapred.task.timeout"); |
| this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS; |
| } |
| } |
| |
| /** |
| * Takes a delimited text record (e.g., the output of a 'Text' object), |
| * re-encodes it for consumption by mysqlimport, and writes it to the pipe. |
| * @param record A delimited text representation of one record. |
| * @param terminator an optional string that contains delimiters that |
| * terminate the record (if not included in 'record' itself). |
| */ |
| protected void writeRecord(String record, String terminator) |
| throws IOException, InterruptedException { |
| |
| // We've already set up mysqlimport to accept the same delimiters, |
| // so we don't need to convert those. But our input text is UTF8 |
| // encoded; mysql allows configurable encoding, but defaults to |
| // latin-1 (ISO8859_1). We'll convert to latin-1 for now. |
| // TODO: Support user-configurable encodings. |
| |
| byte [] mysqlBytes = record.getBytes(this.mysqlCharSet); |
| this.importStream.write(mysqlBytes, 0, mysqlBytes.length); |
| this.bytesWritten += mysqlBytes.length; |
| |
| if (null != terminator) { |
| byte [] termBytes = terminator.getBytes(this.mysqlCharSet); |
| this.importStream.write(termBytes, 0, termBytes.length); |
| this.bytesWritten += termBytes.length; |
| } |
| |
| // If bytesWritten is too big, then we should start a new tx by closing |
| // mysqlimport and opening a new instance of the process. |
| if (this.checkpointDistInBytes != 0 |
| && this.bytesWritten > this.checkpointDistInBytes) { |
| LOG.info("Checkpointing current export."); |
| |
| if (this.checkpointSleepMs != 0) { |
| LOG.info("Pausing."); |
| Thread.sleep(this.checkpointSleepMs); |
| } |
| |
| closeExportHandles(); |
| initMySQLImportProcess(); |
| this.bytesWritten = 0; |
| } |
| } |
| } |
| |