blob: 422653eb51f59890783f094f0425b3475068f2a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce;
import java.io.IOException;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.SynchronousQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.StringUtils;
import org.apache.sqoop.lib.SqoopRecord;
/**
* Abstract OutputFormat class that allows the RecordWriter to buffer
* up SQL commands which should be executed in a separate thread after
* enough commands are created.
*
* This supports a configurable "spill threshold" at which
* point intermediate transactions are committed.
*
* Uses DBOutputFormat/DBConfiguration for configuring the output.
* This is used in conjunction with the abstract AsyncSqlRecordWriter
* class.
*
* Clients of this OutputFormat must implement getRecordWriter(); the
* returned RecordWriter is intended to subclass AsyncSqlRecordWriter.
*/
public abstract class AsyncSqlOutputFormat<K extends SqoopRecord, V>
extends OutputFormat<K, V> {
/** conf key: number of rows to export per INSERT statement. */
public static final String RECORDS_PER_STATEMENT_KEY =
"sqoop.export.records.per.statement";
/** conf key: number of INSERT statements to bundle per tx.
* If this is set to -1, then a single transaction will be used
* per task. Note that each statement may encompass multiple
* rows, depending on the value of sqoop.export.records.per.statement.
*/
public static final String STATEMENTS_PER_TRANSACTION_KEY =
"sqoop.export.statements.per.transaction";
/**
* Default number of records to put in an INSERT statement or
* other batched update statement.
*/
public static final int DEFAULT_RECORDS_PER_STATEMENT = 100;
/**
* Default number of statements to execute before committing the
* current transaction.
*/
public static final int DEFAULT_STATEMENTS_PER_TRANSACTION = 100;
/**
* Value for STATEMENTS_PER_TRANSACTION_KEY signifying that we should
* not commit until the RecordWriter is being closed, regardless of
* the number of statements we execute.
*/
public static final int UNLIMITED_STATEMENTS_PER_TRANSACTION = -1;
private static final Log LOG = LogFactory.getLog(AsyncSqlOutputFormat.class);
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
}
@Override
/** {@inheritDoc} */
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new NullOutputCommitter();
}
/**
* Represents a database update operation that should be performed
* by an asynchronous background thread.
* AsyncDBOperation objects are immutable.
* They MAY contain a statement which should be executed. The
* statement may also be null.
*
* They may also set 'commitAndClose' to true. If true, then the
* executor of this operation should commit the current
* transaction, even if stmt is null, and then stop the executor
* thread.
*/
public static class AsyncDBOperation {
private final PreparedStatement stmt;
private final boolean isBatch;
private final boolean commit;
private final boolean stopThread;
@Deprecated
/** Do not use AsyncDBOperation(PreparedStatement s, boolean
* commitAndClose, boolean batch). Use AsyncDBOperation(PreparedStatement
* s, boolean batch, boolean commit, boolean stopThread) instead.
*/
public AsyncDBOperation(PreparedStatement s, boolean commitAndClose,
boolean batch) {
this(s, batch, commitAndClose, commitAndClose);
}
/**
* Create an asynchronous database operation.
* @param s the statement, if any, to execute.
* @param batch is true if this is a batch PreparedStatement, or false
* if it's a normal singleton statement.
* @param commit is true if this statement should be committed to the
* database.
* @param stopThread if true, the executor thread should stop after this
* operation.
*/
public AsyncDBOperation(PreparedStatement s, boolean batch,
boolean commit, boolean stopThread) {
this.stmt = s;
this.isBatch = batch;
this.commit = commit;
this.stopThread = stopThread;
}
/**
* @return a statement to run as an update.
*/
public PreparedStatement getStatement() {
return stmt;
}
/**
* @return true if the executor should commit the current transaction.
* If getStatement() is non-null, the statement is run first.
*/
public boolean requiresCommit() {
return this.commit;
}
/**
* @return true if the executor should stop after this command.
*/
public boolean stop() {
return this.stopThread;
}
/**
* @return true if this is a batch SQL statement.
*/
public boolean execAsBatch() {
return this.isBatch;
}
}
/**
* A thread that runs the database interactions asynchronously
* from the OutputCollector.
*/
public static class AsyncSqlExecThread extends Thread {
private final Connection conn; // The connection to the database.
private SQLException err; // Error from a previously-run statement.
// How we receive database operations from the RecordWriter.
private SynchronousQueue<AsyncDBOperation> opsQueue;
protected int curNumStatements; // statements executed thus far in the tx.
protected final int stmtsPerTx; // statements per transaction.
/**
* Create a new update thread that interacts with the database.
* @param conn the connection to use. This must only be used by this
* thread.
* @param stmtsPerTx the number of statements to execute before committing
* the current transaction.
*/
public AsyncSqlExecThread(Connection conn, int stmtsPerTx) {
this.conn = conn;
this.err = null;
this.opsQueue = new SynchronousQueue<AsyncDBOperation>();
this.stmtsPerTx = stmtsPerTx;
}
public void run() {
while (true) {
AsyncDBOperation op = null;
try {
op = opsQueue.take();
} catch (InterruptedException ie) {
LOG.warn("Interrupted retrieving from operation queue: "
+ StringUtils.stringifyException(ie));
continue;
}
if (null == op) {
// This shouldn't be allowed to happen.
LOG.warn("Null operation in queue; illegal state.");
continue;
}
PreparedStatement stmt = op.getStatement();
// Synchronize on the connection to ensure it does not conflict
// with the prepareStatement() call in the main thread.
synchronized (conn) {
try {
if (null != stmt) {
if (op.execAsBatch()) {
stmt.executeBatch();
} else {
stmt.execute();
}
stmt.close();
stmt = null;
this.curNumStatements++;
}
if (op.requiresCommit() || (curNumStatements >= stmtsPerTx
&& stmtsPerTx != UNLIMITED_STATEMENTS_PER_TRANSACTION)) {
LOG.debug("Committing transaction of " + curNumStatements
+ " statements");
this.conn.commit();
this.curNumStatements = 0;
}
} catch (BatchUpdateException batchE) {
if (batchE.getNextException() != null) {
// if a statement in a batch causes an SQLException
// the database can either set it as the cause of
// the BatchUpdateException, or set it as the 'next'
// field of the BatchUpdateException (e.g. HSQLDB 1.8
// does the former and Postgres 8.4 does the latter).
// We'll check for this SQLException in both places,
// and use the 'next' one in preference.
setLastError(batchE.getNextException());
} else {
// same as SQLException block
setLastError(batchE);
}
} catch (SQLException sqlE) {
setLastError(sqlE);
} finally {
// Close the statement on our way out if that didn't happen
// via the normal execution path.
if (null != stmt) {
try {
stmt.close();
} catch (SQLException sqlE) {
setLastError(sqlE);
}
}
// Always check whether we should end the loop, regardless
// of the presence of an exception.
if (op.stop()) {
return;
}
} // try .. catch .. finally.
} // synchronized (conn)
}
}
/**
* Allows a user to enqueue the next database operation to run.
* Since the connection can only execute a single operation at a time,
* the put() method may block if another operation is already underway.
* @param op the database operation to perform.
*/
public void put(AsyncDBOperation op) throws InterruptedException {
opsQueue.put(op);
}
/**
* If a previously-executed statement resulted in an error, post it here.
* If the error slot was already filled, then subsequent errors are
* squashed until the user calls this method (which clears the error
* slot).
* @return any SQLException that occurred due to a previously-run
* statement.
*/
public synchronized SQLException getLastError() {
SQLException e = this.err;
this.err = null;
return e;
}
private synchronized void setLastError(SQLException e) {
if (this.err == null) {
// Just set it.
LOG.error("Got exception in update thread: "
+ StringUtils.stringifyException(e));
this.err = e;
} else {
// Slot is full. Log it and discard.
LOG.error("SQLException in update thread but error slot full: "
+ StringUtils.stringifyException(e));
}
}
}
}