blob: c879c709a8e1995faddbe09cf2036085367955d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.db;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.sqoop.mapreduce.sqlserver.SqlServerRecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.lib.SqoopRecord;
/**
* A RecordReader that reads records from a SQL table.
* This record reader handles connection failures using the configured
* connection failure handler
*/
public class SQLServerDBRecordReader<T extends SqoopRecord> extends
SqlServerRecordReader<T> {
private static final Log LOG =
LogFactory.getLog(SQLServerDBRecordReader.class);
// The SQL handler to be used for recovering failed read operations
protected SQLFailureHandler failureHandler = null;
// Recover failed reads for RETRY_MAX
protected static final int RETRY_MAX = 3;
// Name of the split column used to re-generate selectQueries after
// connection failures
private String splitColumn;
private String lastRecordValue;
public SQLServerDBRecordReader(DBInputFormat.DBInputSplit split,
Class<T> inputClass, Configuration conf, Connection conn,
DBConfiguration dbConfig, String cond, String [] fields, String table,
String dbProduct) throws SQLException {
super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
}
@Override
/** {@inheritDoc} */
public T getCurrentValue() {
T val = currentValue();
saveCurrentValue(val);
return val;
}
T currentValue() {
return super.getCurrentValue();
}
void saveCurrentValue(T value) {
lastRecordValue = getCurrentValueOfSplitByColumnFromORM(value, splitColumn);
}
private String getCurrentValueOfSplitByColumnFromORM(T generatedORMRecord, String columnName) {
Object result = generatedORMRecord.getFieldMap().get(columnName);
if (result != null) {
return result.toString();
}
return getCurrentValueOfSplitByColumnFromORMIfSplitByDoesNotMatch(generatedORMRecord, columnName);
}
/*
* SQOOP-3139: It is a workaround if the database/table/column is used in case insensitive mode and the user
* uses Sqoop import with --split-by but the given parameter doesn't match with table name if it is case sensitive
* eg.: tableName.equals(split-by) doesn't match only if tableName.equalsIgnorecase(split-by)
*
*/
private String getCurrentValueOfSplitByColumnFromORMIfSplitByDoesNotMatch(T generatedORMRecord, String columnName) {
for (Map.Entry<String, Object> fields : generatedORMRecord.getFieldMap().entrySet()) {
if (columnName.equalsIgnoreCase(fields.getKey())) {
return fields.getValue() != null ? fields.getValue().toString() : null;
}
}
return null;
}
String getLastRecordValue() {
return lastRecordValue;
}
/**
* Load the SQLFailureHandler configured for use by the record reader.
*/
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
// Load the configured connection failure handler
Configuration conf = getConf();
if (conf == null) {
LOG.error("Configuration cannot be NULL");
}
Class connHandlerClass;
try {
String className = conf.get(
SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS);
// Get the class-name set in configuration
connHandlerClass = conf.getClassByName(className);
} catch (ClassNotFoundException ex) {
LOG.error("Failed to find class: "
+ SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS);
throw new IOException(ex);
}
// Verify handler class is a subclass of SQLFailureHandler
if (!SQLFailureHandler.class.isAssignableFrom(connHandlerClass)) {
String error = "A subclass of " + SQLFailureHandler.class.getName()
+ " is expected. Actual class set is: " + connHandlerClass.getName();
LOG.error(error);
throw new IOException(error);
}
LOG.trace("Using connection handler class: " + connHandlerClass);
// Load the configured connection failure handler
failureHandler = ReflectionUtils.newInstance(
(Class<? extends SQLFailureHandler>)connHandlerClass, conf);
// Initialize the connection handler with using job configuration
failureHandler.initialize(conf);
// Get the split-by column
splitColumn = getDBConf().getInputOrderBy();
if (splitColumn == null || splitColumn.length() == 0) {
throw new IOException("Split column must be set");
}
// Ensure the split-column is not escaped so that we can use it to search
// in the record map
int splitColLen = splitColumn.length();
if (splitColLen > 2 && splitColumn.charAt(0) == '['
&& splitColumn.charAt(splitColLen-1) == ']') {
splitColumn = splitColumn.substring(1, splitColLen - 1);
}
}
@Override
/**
* Read the next key, value pair.
* Try to recover failed connections using the configured connection failure
* handler before retrying the failed operation
*/
public boolean nextKeyValue() throws IOException {
boolean valueReceived = false;
int retryCount = RETRY_MAX;
boolean doRetry = true;
do {
try {
// Try to get the next key/value pairs
valueReceived = super.nextKeyValue();
doRetry = false;
} catch (IOException ioEx) {
LOG.warn("Trying to recover from DB read failure: ", ioEx);
Throwable cause = ioEx.getCause();
// Use configured connection handler to recover from the connection
// failure and use the newly constructed connection.
// If the failure cannot be recovered, an exception is thrown
if (failureHandler.canHandleFailure(cause)) {
// Recover from connection failure
Connection conn = failureHandler.recover();
// Configure the new connection before using it
configureConnection(conn);
setConnection(conn);
--retryCount;
doRetry = (retryCount >= 0);
} else {
// Cannot recovered using configured handler, re-throw
throw new IOException("Cannection handler cannot recover failure: ",
ioEx);
}
}
} while (doRetry);
// Rethrow the exception if all retry attempts are consumed
if (retryCount < 0) {
throw new IOException("Failed to read from database after "
+ RETRY_MAX + " retries.");
}
return valueReceived;
}
/**
* Configure the provided Connection for record reads.
*/
protected void configureConnection(Connection conn) throws IOException {
try {
conn.setAutoCommit(false);
conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
} catch (SQLException sqlEx) {
LOG.error("Failed to configure SQL Connection");
throw new IOException(sqlEx);
}
}
/** Returns the query for selecting the records,
* For handling connection recovery we always want to start from the last
* record that was successfully read.
*/
protected String getSelectQuery() {
// Last seen record key is only expected to be unavailable if no reads
// ever happened
String selectQuery;
if (lastRecordValue == null) {
selectQuery = super.getSelectQuery();
} else {
// If last record key is available, construct the select query to start
// from
DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit =
(DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit();
StringBuilder lowerClause = new StringBuilder();
lowerClause.append(getDBConf().getInputOrderBy());
lowerClause.append(" > ");
lowerClause.append("'" + lastRecordValue.toString() + "'");
// Get the select query with the lowerClause, and split upper clause
selectQuery = getSelectQuery(lowerClause.toString(),
dataSplit.getUpperClause());
}
return selectQuery;
}
}