blob: 218531160ba2cc2436b0507f992011b8c04f44b3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.lib.db;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
/**
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class DBInputFormat<T extends DBWritable>
extends org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T>
implements InputFormat<LongWritable, T>, JobConfigurable {
/**
* A RecordReader that reads records from a SQL table.
* Emits LongWritables containing the record number as
* key and DBWritables as value.
*/
protected class DBRecordReader extends
org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>
implements RecordReader<LongWritable, T> {
/**
* @param split The InputSplit to read data for
* @throws SQLException
*/
protected DBRecordReader(DBInputSplit split, Class<T> inputClass,
JobConf job, Connection conn, DBConfiguration dbConfig, String cond,
String [] fields, String table) throws SQLException {
super(split, inputClass, job, conn, dbConfig, cond, fields, table);
}
/** {@inheritDoc} */
public LongWritable createKey() {
return new LongWritable();
}
/** {@inheritDoc} */
public T createValue() {
return super.createValue();
}
public long getPos() throws IOException {
return super.getPos();
}
/** {@inheritDoc} */
public boolean next(LongWritable key, T value) throws IOException {
return super.next(key, value);
}
}
/**
* A RecordReader implementation that just passes through to a wrapped
* RecordReader built with the new API.
*/
private static class DBRecordReaderWrapper<T extends DBWritable>
implements RecordReader<LongWritable, T> {
private org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> rr;
public DBRecordReaderWrapper(
org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> inner) {
this.rr = inner;
}
public void close() throws IOException {
rr.close();
}
public LongWritable createKey() {
return new LongWritable();
}
public T createValue() {
return rr.createValue();
}
public float getProgress() throws IOException {
return rr.getProgress();
}
public long getPos() throws IOException {
return rr.getPos();
}
public boolean next(LongWritable key, T value) throws IOException {
return rr.next(key, value);
}
}
/**
* A Class that does nothing, implementing DBWritable
*/
public static class NullDBWritable extends
org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable
implements DBWritable, Writable {
}
/**
* A InputSplit that spans a set of rows
*/
protected static class DBInputSplit extends
org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit
implements InputSplit {
/**
* Default Constructor
*/
public DBInputSplit() {
}
/**
* Convenience Constructor
* @param start the index of the first row to select
* @param end the index of the last row to select
*/
public DBInputSplit(long start, long end) {
super(start, end);
}
}
/** {@inheritDoc} */
public void configure(JobConf job) {
super.setConf(job);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
// wrap the DBRR in a shim class to deal with API differences.
return new DBRecordReaderWrapper<T>(
(org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>)
createDBRecordReader(
(org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job));
}
/** {@inheritDoc} */
public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
List<org.apache.hadoop.mapreduce.InputSplit> newSplits =
super.getSplits(new Job(job));
InputSplit[] ret = new InputSplit[newSplits.size()];
int i = 0;
for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split =
(org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s;
ret[i++] = new DBInputSplit(split.getStart(), split.getEnd());
}
return ret;
}
/**
* Initializes the map-part of the job with the appropriate input settings.
*
* @param job The job
* @param inputClass the class object implementing DBWritable, which is the
* Java object holding tuple fields.
* @param tableName The table to read data from
* @param conditions The condition which to select data with, eg. '(updated >
* 20070101 AND length > 0)'
* @param orderBy the fieldNames in the orderBy clause.
* @param fieldNames The field names in the table
* @see #setInput(JobConf, Class, String, String)
*/
public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
String tableName,String conditions, String orderBy, String... fieldNames) {
job.setInputFormat(DBInputFormat.class);
DBConfiguration dbConf = new DBConfiguration(job);
dbConf.setInputClass(inputClass);
dbConf.setInputTableName(tableName);
dbConf.setInputFieldNames(fieldNames);
dbConf.setInputConditions(conditions);
dbConf.setInputOrderBy(orderBy);
}
/**
* Initializes the map-part of the job with the appropriate input settings.
*
* @param job The job
* @param inputClass the class object implementing DBWritable, which is the
* Java object holding tuple fields.
* @param inputQuery the input query to select fields. Example :
* "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
* @param inputCountQuery the input query that returns the number of records in
* the table.
* Example : "SELECT COUNT(f1) FROM Mytable"
* @see #setInput(JobConf, Class, String, String, String, String...)
*/
public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
String inputQuery, String inputCountQuery) {
job.setInputFormat(DBInputFormat.class);
DBConfiguration dbConf = new DBConfiguration(job);
dbConf.setInputClass(inputClass);
dbConf.setInputQuery(inputQuery);
dbConf.setInputCountQuery(inputCountQuery);
}
}