| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.sqoop.mapreduce.db; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.sql.Connection; |
| import java.sql.DatabaseMetaData; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configurable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.mapreduce.InputFormat; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.sqoop.mapreduce.DBWritable; |
| |
| import org.apache.sqoop.config.ConfigurationHelper; |
| |
| /** |
| * A InputFormat that reads input data from an SQL table. |
| * <p> |
| * DBInputFormat emits LongWritables containing the record number as |
| * key and DBWritables as value. |
| * |
| * The SQL query, and input class can be using one of the two |
| * setInput methods. |
| */ |
| public class DBInputFormat<T extends DBWritable> |
| extends InputFormat<LongWritable, T> implements Configurable { |
| |
| public static final Log LOG = LogFactory.getLog( |
| DBInputFormat.class.getName()); |
| private String dbProductName = "DEFAULT"; |
| |
| /** |
| * A Class that does nothing, implementing DBWritable. |
| */ |
| public static class NullDBWritable implements DBWritable, Writable { |
| @Override |
| public void readFields(DataInput in) throws IOException { } |
| @Override |
| public void readFields(ResultSet arg0) throws SQLException { } |
| @Override |
| public void write(DataOutput out) throws IOException { } |
| @Override |
| public void write(PreparedStatement arg0) throws SQLException { } |
| } |
| |
| /** |
| * A InputSplit that spans a set of rows. |
| */ |
| public static class DBInputSplit extends InputSplit implements Writable { |
| |
| private long end = 0; |
| private long start = 0; |
| |
| /** |
| * Default Constructor. |
| */ |
| public DBInputSplit() { |
| } |
| |
| /** |
| * Convenience Constructor. |
| * @param start the index of the first row to select |
| * @param end the index of the last row to select |
| */ |
| public DBInputSplit(long start, long end) { |
| this.start = start; |
| this.end = end; |
| } |
| |
| @Override |
| /** {@inheritDoc} */ |
| public String[] getLocations() throws IOException { |
| // TODO Add a layer to enable SQL "sharding" and support locality |
| return new String[] {}; |
| } |
| |
| /** |
| * @return The index of the first row to select |
| */ |
| public long getStart() { |
| return start; |
| } |
| |
| /** |
| * @return The index of the last row to select |
| */ |
| public long getEnd() { |
| return end; |
| } |
| |
| /** |
| * @return The total row count in this split |
| */ |
| public long getLength() throws IOException { |
| return end - start; |
| } |
| |
| @Override |
| /** {@inheritDoc} */ |
| public void readFields(DataInput input) throws IOException { |
| start = input.readLong(); |
| end = input.readLong(); |
| } |
| |
| @Override |
| /** {@inheritDoc} */ |
| public void write(DataOutput output) throws IOException { |
| output.writeLong(start); |
| output.writeLong(end); |
| } |
| } |
| |
| private String conditions; |
| |
| private Connection connection; |
| |
| private String tableName; |
| |
| private String[] fieldNames; |
| |
| private DBConfiguration dbConf; |
| |
| @Override |
| /** {@inheritDoc} */ |
| public void setConf(Configuration conf) { |
| setDbConf(new DBConfiguration(conf)); |
| } |
| |
| public void setDbConf(DBConfiguration dbConf) { |
| this.dbConf = dbConf; |
| |
| try { |
| getConnection(); |
| } catch (Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| |
| tableName = dbConf.getInputTableName(); |
| fieldNames = dbConf.getInputFieldNames(); |
| conditions = dbConf.getInputConditions(); |
| } |
| |
| private void setTxIsolation(Connection conn) { |
| try { |
| |
| if (getConf() |
| .getBoolean(DBConfiguration.PROP_RELAXED_ISOLATION, false)) { |
| if (dbProductName.startsWith("ORACLE")) { |
| LOG.info("Using read committed transaction isolation for Oracle" |
| + " as read uncommitted is not supported"); |
| this.connection.setTransactionIsolation( |
| Connection.TRANSACTION_READ_COMMITTED); |
| } else { |
| LOG.info("Using read uncommited transaction isolation"); |
| this.connection.setTransactionIsolation( |
| Connection.TRANSACTION_READ_UNCOMMITTED); |
| } |
| } |
| else { |
| LOG.info("Using read commited transaction isolation"); |
| this.connection.setTransactionIsolation( |
| Connection.TRANSACTION_READ_COMMITTED); |
| } |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| public Configuration getConf() { |
| return dbConf.getConf(); |
| } |
| |
| public DBConfiguration getDBConf() { |
| return dbConf; |
| } |
| |
| public Connection getConnection() { |
| try { |
| |
| if (null == this.connection) { |
| // The connection was closed; reinstantiate it. |
| this.connection = dbConf.getConnection(); |
| this.connection.setAutoCommit(false); |
| DatabaseMetaData dbMeta = connection.getMetaData(); |
| this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase(); |
| setTxIsolation(connection); |
| } |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| return connection; |
| } |
| |
| public String getDBProductName() { |
| return dbProductName; |
| } |
| |
| protected RecordReader<LongWritable, T> createDBRecordReader( |
| DBInputFormat.DBInputSplit split, |
| Configuration conf) throws IOException { |
| |
| @SuppressWarnings("unchecked") |
| Class<T> inputClass = (Class<T>) (dbConf.getInputClass()); |
| try { |
| // use database product name to determine appropriate record reader. |
| if (dbProductName.startsWith("ORACLE")) { |
| // use Oracle-specific db reader. |
| return new OracleDBRecordReader<T>(split, inputClass, |
| conf, getConnection(), getDBConf(), conditions, fieldNames, |
| tableName); |
| } else if (dbProductName.startsWith("DB2")) { |
| // use DB2-specific db reader. |
| return new Db2DBRecordReader<T>(split, inputClass, |
| conf, getConnection(), getDBConf(), conditions, fieldNames, |
| tableName); |
| } else { |
| // Generic reader. |
| return new DBRecordReader<T>(split, inputClass, |
| conf, getConnection(), getDBConf(), conditions, fieldNames, |
| tableName); |
| } |
| } catch (SQLException ex) { |
| throw new IOException(ex); |
| } |
| } |
| |
| @Override |
| /** {@inheritDoc} */ |
| public RecordReader<LongWritable, T> createRecordReader(InputSplit split, |
| TaskAttemptContext context) throws IOException, InterruptedException { |
| |
| return createDBRecordReader( |
| (DBInputFormat.DBInputSplit) split, |
| context.getConfiguration()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public List<InputSplit> getSplits(JobContext job) throws IOException { |
| |
| ResultSet results = null; |
| Statement statement = null; |
| try { |
| statement = connection.createStatement(); |
| |
| results = statement.executeQuery(getCountQuery()); |
| results.next(); |
| |
| long count = results.getLong(1); |
| int chunks = ConfigurationHelper.getJobNumMaps(job); |
| long chunkSize = (count / chunks); |
| |
| results.close(); |
| statement.close(); |
| |
| List<InputSplit> splits = new ArrayList<InputSplit>(); |
| |
| // Split the rows into n-number of chunks and adjust the last chunk |
| // accordingly |
| for (int i = 0; i < chunks; i++) { |
| DBInputSplit split; |
| |
| if ((i + 1) == chunks) { |
| split = new DBInputSplit(i * chunkSize, count); |
| } else { |
| split = new DBInputSplit(i * chunkSize, (i * chunkSize) |
| + chunkSize); |
| } |
| |
| splits.add(split); |
| } |
| |
| connection.commit(); |
| return splits; |
| } catch (SQLException e) { |
| throw new IOException("Got SQLException", e); |
| } finally { |
| try { |
| if (results != null) { results.close(); } |
| } catch (SQLException e1) { /* ignored */ } |
| try { |
| if (statement != null) { statement.close(); } |
| } catch (SQLException e1) { /* ignored */ } |
| |
| closeConnection(); |
| } |
| } |
| |
| /** Returns the query for getting the total number of rows, |
| * subclasses can override this for custom behaviour.*/ |
| protected String getCountQuery() { |
| |
| if (dbConf.getInputCountQuery() != null) { |
| return dbConf.getInputCountQuery(); |
| } |
| |
| StringBuilder query = new StringBuilder(); |
| query.append("SELECT COUNT(*) FROM " + tableName); |
| |
| if (conditions != null && conditions.length() > 0) { |
| query.append(" WHERE " + conditions); |
| } |
| return query.toString(); |
| } |
| |
| /** |
| * Initializes the map-part of the job with the appropriate input settings. |
| * |
| * @param job The map-reduce job |
| * @param inputClass the class object implementing DBWritable, which is the |
| * Java object holding tuple fields. |
| * @param tableName The table to read data from |
| * @param conditions The condition which to select data with, |
| * eg. '(updated > 20070101 AND length > 0)' |
| * @param orderBy the fieldNames in the orderBy clause. |
| * @param fieldNames The field names in the table |
| * @see #setInput(Job, Class, String, String) |
| */ |
| public static void setInput(Job job, |
| Class<? extends DBWritable> inputClass, |
| String tableName, String conditions, |
| String orderBy, String... fieldNames) { |
| job.setInputFormatClass(DBInputFormat.class); |
| DBConfiguration dbConf = new DBConfiguration(job.getConfiguration()); |
| dbConf.setInputClass(inputClass); |
| dbConf.setInputTableName(tableName); |
| dbConf.setInputFieldNames(fieldNames); |
| dbConf.setInputConditions(conditions); |
| dbConf.setInputOrderBy(orderBy); |
| } |
| |
| /** |
| * Initializes the map-part of the job with the appropriate input settings. |
| * |
| * @param job The map-reduce job |
| * @param inputClass the class object implementing DBWritable, which is the |
| * Java object holding tuple fields. |
| * @param inputQuery the input query to select fields. Example : |
| * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1" |
| * @param inputCountQuery the input query that returns |
| * the number of records in the table. |
| * Example : "SELECT COUNT(f1) FROM Mytable" |
| * @see #setInput(Job, Class, String, String, String, String...) |
| */ |
| public static void setInput(Job job, |
| Class<? extends DBWritable> inputClass, |
| String inputQuery, String inputCountQuery) { |
| job.setInputFormatClass(DBInputFormat.class); |
| DBConfiguration dbConf = new DBConfiguration(job.getConfiguration()); |
| dbConf.setInputClass(inputClass); |
| dbConf.setInputQuery(inputQuery); |
| dbConf.setInputCountQuery(inputCountQuery); |
| } |
| |
| protected void closeConnection() { |
| try { |
| if (null != this.connection) { |
| this.connection.close(); |
| this.connection = null; |
| } |
| } catch (SQLException sqlE) { |
| LOG.error("Cannot close JDBC connection.", sqlE); |
| } |
| } |
| |
| } |