| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.sqoop.mapreduce.db; |
| |
| import java.io.IOException; |
| import java.sql.Connection; |
| import java.sql.SQLException; |
| import java.util.Map; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.sqoop.mapreduce.sqlserver.SqlServerRecordReader; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| |
| import org.apache.sqoop.lib.SqoopRecord; |
| |
| /** |
| * A RecordReader that reads records from a SQL table. |
| * This record reader handles connection failures using the configured |
| * connection failure handler |
| */ |
| public class SQLServerDBRecordReader<T extends SqoopRecord> extends |
| SqlServerRecordReader<T> { |
| |
| private static final Log LOG = |
| LogFactory.getLog(SQLServerDBRecordReader.class); |
| |
| // The SQL handler to be used for recovering failed read operations |
| protected SQLFailureHandler failureHandler = null; |
| |
| // Recover failed reads for RETRY_MAX |
| protected static final int RETRY_MAX = 3; |
| |
| // Name of the split column used to re-generate selectQueries after |
| // connection failures |
| private String splitColumn; |
| private String lastRecordValue; |
| |
| public SQLServerDBRecordReader(DBInputFormat.DBInputSplit split, |
| Class<T> inputClass, Configuration conf, Connection conn, |
| DBConfiguration dbConfig, String cond, String [] fields, String table, |
| String dbProduct) throws SQLException { |
| super(split, inputClass, conf, conn, dbConfig, cond, fields, table); |
| } |
| |
| @Override |
| /** {@inheritDoc} */ |
| public T getCurrentValue() { |
| T val = currentValue(); |
| |
| saveCurrentValue(val); |
| |
| return val; |
| } |
| |
| T currentValue() { |
| return super.getCurrentValue(); |
| } |
| |
| void saveCurrentValue(T value) { |
| lastRecordValue = getCurrentValueOfSplitByColumnFromORM(value, splitColumn); |
| } |
| |
| private String getCurrentValueOfSplitByColumnFromORM(T generatedORMRecord, String columnName) { |
| Object result = generatedORMRecord.getFieldMap().get(columnName); |
| if (result != null) { |
| return result.toString(); |
| } |
| return getCurrentValueOfSplitByColumnFromORMIfSplitByDoesNotMatch(generatedORMRecord, columnName); |
| } |
| |
| /* |
| * SQOOP-3139: It is a workaround if the database/table/column is used in case insensitive mode and the user |
| * uses Sqoop import with --split-by but the given parameter doesn't match with table name if it is case sensitive |
| * eg.: tableName.equals(split-by) doesn't match only if tableName.equalsIgnorecase(split-by) |
| * |
| */ |
| private String getCurrentValueOfSplitByColumnFromORMIfSplitByDoesNotMatch(T generatedORMRecord, String columnName) { |
| for (Map.Entry<String, Object> fields : generatedORMRecord.getFieldMap().entrySet()) { |
| if (columnName.equalsIgnoreCase(fields.getKey())) { |
| return fields.getValue() != null ? fields.getValue().toString() : null; |
| } |
| } |
| return null; |
| } |
| |
| String getLastRecordValue() { |
| return lastRecordValue; |
| } |
| |
| /** |
| * Load the SQLFailureHandler configured for use by the record reader. |
| */ |
| public void initialize(InputSplit inputSplit, TaskAttemptContext context) |
| throws IOException, InterruptedException { |
| // Load the configured connection failure handler |
| Configuration conf = getConf(); |
| if (conf == null) { |
| LOG.error("Configuration cannot be NULL"); |
| } |
| |
| Class connHandlerClass; |
| try { |
| String className = conf.get( |
| SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS); |
| |
| // Get the class-name set in configuration |
| connHandlerClass = conf.getClassByName(className); |
| } catch (ClassNotFoundException ex) { |
| LOG.error("Failed to find class: " |
| + SQLServerDBInputFormat.IMPORT_FAILURE_HANDLER_CLASS); |
| throw new IOException(ex); |
| } |
| |
| // Verify handler class is a subclass of SQLFailureHandler |
| if (!SQLFailureHandler.class.isAssignableFrom(connHandlerClass)) { |
| String error = "A subclass of " + SQLFailureHandler.class.getName() |
| + " is expected. Actual class set is: " + connHandlerClass.getName(); |
| LOG.error(error); |
| throw new IOException(error); |
| } |
| LOG.trace("Using connection handler class: " + connHandlerClass); |
| |
| // Load the configured connection failure handler |
| failureHandler = ReflectionUtils.newInstance( |
| (Class<? extends SQLFailureHandler>)connHandlerClass, conf); |
| |
| // Initialize the connection handler with using job configuration |
| failureHandler.initialize(conf); |
| |
| // Get the split-by column |
| splitColumn = getDBConf().getInputOrderBy(); |
| if (splitColumn == null || splitColumn.length() == 0) { |
| throw new IOException("Split column must be set"); |
| } |
| |
| // Ensure the split-column is not escaped so that we can use it to search |
| // in the record map |
| int splitColLen = splitColumn.length(); |
| if (splitColLen > 2 && splitColumn.charAt(0) == '[' |
| && splitColumn.charAt(splitColLen-1) == ']') { |
| splitColumn = splitColumn.substring(1, splitColLen - 1); |
| } |
| } |
| |
| @Override |
| /** |
| * Read the next key, value pair. |
| * Try to recover failed connections using the configured connection failure |
| * handler before retrying the failed operation |
| */ |
| public boolean nextKeyValue() throws IOException { |
| boolean valueReceived = false; |
| int retryCount = RETRY_MAX; |
| boolean doRetry = true; |
| |
| do { |
| try { |
| // Try to get the next key/value pairs |
| valueReceived = super.nextKeyValue(); |
| doRetry = false; |
| } catch (IOException ioEx) { |
| LOG.warn("Trying to recover from DB read failure: ", ioEx); |
| Throwable cause = ioEx.getCause(); |
| |
| // Use configured connection handler to recover from the connection |
| // failure and use the newly constructed connection. |
| // If the failure cannot be recovered, an exception is thrown |
| if (failureHandler.canHandleFailure(cause)) { |
| // Recover from connection failure |
| Connection conn = failureHandler.recover(); |
| |
| // Configure the new connection before using it |
| configureConnection(conn); |
| setConnection(conn); |
| |
| --retryCount; |
| doRetry = (retryCount >= 0); |
| } else { |
| // Cannot recovered using configured handler, re-throw |
| throw new IOException("Cannection handler cannot recover failure: ", |
| ioEx); |
| } |
| } |
| } while (doRetry); |
| |
| // Rethrow the exception if all retry attempts are consumed |
| if (retryCount < 0) { |
| throw new IOException("Failed to read from database after " |
| + RETRY_MAX + " retries."); |
| } |
| |
| return valueReceived; |
| } |
| |
| /** |
| * Configure the provided Connection for record reads. |
| */ |
| protected void configureConnection(Connection conn) throws IOException { |
| try { |
| conn.setAutoCommit(false); |
| conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); |
| } catch (SQLException sqlEx) { |
| LOG.error("Failed to configure SQL Connection"); |
| throw new IOException(sqlEx); |
| } |
| } |
| |
| /** Returns the query for selecting the records, |
| * For handling connection recovery we always want to start from the last |
| * record that was successfully read. |
| */ |
| protected String getSelectQuery() { |
| // Last seen record key is only expected to be unavailable if no reads |
| // ever happened |
| String selectQuery; |
| if (lastRecordValue == null) { |
| selectQuery = super.getSelectQuery(); |
| } else { |
| // If last record key is available, construct the select query to start |
| // from |
| DataDrivenDBInputFormat.DataDrivenDBInputSplit dataSplit = |
| (DataDrivenDBInputFormat.DataDrivenDBInputSplit) getSplit(); |
| StringBuilder lowerClause = new StringBuilder(); |
| lowerClause.append(getDBConf().getInputOrderBy()); |
| lowerClause.append(" > "); |
| lowerClause.append("'" + lastRecordValue.toString() + "'"); |
| |
| // Get the select query with the lowerClause, and split upper clause |
| selectQuery = getSelectQuery(lowerClause.toString(), |
| dataSplit.getUpperClause()); |
| } |
| |
| return selectQuery; |
| } |
| |
| } |