blob: ccd84176d7e1c8fdd685857d9243649d7e7ffd8f [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapred;
import java.io.Closeable;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
/**
* A Base for {@link TableInputFormat}s. Receives a {@link Table}, a
* byte[] of input columns and optionally a {@link Filter}.
* Subclasses may use other TableRecordReader implementations.
*
* Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
* function properly. Each of the entry points to this class used by the MapReduce framework,
* {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
* will call {@link #initialize(JobConf)} as a convenient centralized location to handle
* retrieving the necessary configuration information. If your subclass overrides either of these
* methods, either call the parent version or call initialize yourself.
*
* <p>
* An example of a subclass:
* <pre>
* class ExampleTIF extends TableInputFormatBase {
*
* {@literal @}Override
* protected void initialize(JobConf context) throws IOException {
* // We are responsible for the lifecycle of this connection until we hand it over in
* // initializeTable.
* Connection connection =
* ConnectionFactory.createConnection(HBaseConfiguration.create(job));
* TableName tableName = TableName.valueOf("exampleTable");
* // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
* initializeTable(connection, tableName);
* byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
* Bytes.toBytes("columnB") };
* // mandatory
* setInputColumns(inputColumns);
* // optional, by default we'll get everything for the given columns.
* Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
* setRowFilter(exampleFilter);
* }
* }
* </pre>
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class TableInputFormatBase
implements InputFormat<ImmutableBytesWritable, Result> {
private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
private byte [][] inputColumns;
private Table table;
private RegionLocator regionLocator;
private Connection connection;
private TableRecordReader tableRecordReader;
private Filter rowFilter;
private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
"initialized. Ensure you call initializeTable either in your constructor or initialize " +
"method";
private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
" previous error. Please look at the previous logs lines from" +
" the task's full log for more details.";
/**
* Builds a TableRecordReader. If no TableRecordReader was provided, uses
* the default.
*
* @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
* JobConf, Reporter)
*/
public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
InputSplit split, JobConf job, Reporter reporter)
throws IOException {
// In case a subclass uses the deprecated approach or calls initializeTable directly
if (table == null) {
initialize(job);
}
// null check in case our child overrides getTable to not throw.
try {
if (getTable() == null) {
// initialize() must not have been implemented in the subclass.
throw new IOException(INITIALIZATION_ERROR);
}
} catch (IllegalStateException exception) {
throw new IOException(INITIALIZATION_ERROR, exception);
}
TableSplit tSplit = (TableSplit) split;
// if no table record reader was provided use default
final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() :
this.tableRecordReader;
trr.setStartRow(tSplit.getStartRow());
trr.setEndRow(tSplit.getEndRow());
trr.setHTable(this.table);
trr.setInputColumns(this.inputColumns);
trr.setRowFilter(this.rowFilter);
trr.init();
return new RecordReader<ImmutableBytesWritable, Result>() {
@Override
public void close() throws IOException {
trr.close();
closeTable();
}
@Override
public ImmutableBytesWritable createKey() {
return trr.createKey();
}
@Override
public Result createValue() {
return trr.createValue();
}
@Override
public long getPos() throws IOException {
return trr.getPos();
}
@Override
public float getProgress() throws IOException {
return trr.getProgress();
}
@Override
public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
return trr.next(key, value);
}
};
}
/**
* Calculates the splits that will serve as input for the map tasks.
*
* Splits are created in number equal to the smallest between numSplits and
* the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table.
* If the number of splits is smaller than the number of
* {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits are spanned across
* multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s
* and are grouped the most evenly possible. In the
* case splits are uneven the bigger splits are placed first in the
* {@link InputSplit} array.
*
* @param job the map task {@link JobConf}
* @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
*
* @return the input splits
*
* @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
*/
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
if (this.table == null) {
initialize(job);
}
// null check in case our child overrides getTable to not throw.
try {
if (getTable() == null) {
// initialize() must not have been implemented in the subclass.
throw new IOException(INITIALIZATION_ERROR);
}
} catch (IllegalStateException exception) {
throw new IOException(INITIALIZATION_ERROR, exception);
}
byte [][] startKeys = this.regionLocator.getStartKeys();
if (startKeys == null || startKeys.length == 0) {
throw new IOException("Expecting at least one region");
}
if (this.inputColumns == null || this.inputColumns.length == 0) {
throw new IOException("Expecting at least one column");
}
int realNumSplits = numSplits > startKeys.length? startKeys.length:
numSplits;
InputSplit[] splits = new InputSplit[realNumSplits];
int middle = startKeys.length / realNumSplits;
int startPos = 0;
for (int i = 0; i < realNumSplits; i++) {
int lastPos = startPos + middle;
lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
String regionLocation = regionLocator.getRegionLocation(startKeys[startPos]).
getHostname();
splits[i] = new TableSplit(this.table.getName(),
startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
HConstants.EMPTY_START_ROW, regionLocation);
LOG.info("split: " + i + "->" + splits[i]);
startPos = lastPos;
}
return splits;
}
/**
* Allows subclasses to initialize the table information.
*
* @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close.
* @param tableName The {@link TableName} of the table to process.
* @throws IOException
*/
protected void initializeTable(Connection connection, TableName tableName) throws IOException {
if (this.table != null || this.connection != null) {
LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
"reference; TableInputFormatBase will not close these old references when done.");
}
this.table = connection.getTable(tableName);
this.regionLocator = connection.getRegionLocator(tableName);
this.connection = connection;
}
/**
* @param inputColumns to be passed in {@link Result} to the map task.
*/
protected void setInputColumns(byte [][] inputColumns) {
this.inputColumns = inputColumns;
}
/**
* Allows subclasses to get the {@link Table}.
*/
protected Table getTable() {
if (table == null) {
throw new IllegalStateException(NOT_INITIALIZED);
}
return this.table;
}
/**
* Allows subclasses to set the {@link TableRecordReader}.
*
* @param tableRecordReader
* to provide other {@link TableRecordReader} implementations.
*/
protected void setTableRecordReader(TableRecordReader tableRecordReader) {
this.tableRecordReader = tableRecordReader;
}
/**
* Allows subclasses to set the {@link Filter} to be used.
*
* @param rowFilter
*/
protected void setRowFilter(Filter rowFilter) {
this.rowFilter = rowFilter;
}
/**
* Handle subclass specific set up.
* Each of the entry points used by the MapReduce framework,
* {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)},
* will call {@link #initialize(JobConf)} as a convenient centralized location to handle
* retrieving the necessary configuration information and calling
* {@link #initializeTable(Connection, TableName)}.
*
* Subclasses should implement their initialize call such that it is safe to call multiple times.
* The current TableInputFormatBase implementation relies on a non-null table reference to decide
* if an initialize call is needed, but this behavior may change in the future. In particular,
* it is critical that initializeTable not be called multiple times since this will leak
* Connection instances.
*
*/
protected void initialize(JobConf job) throws IOException {
}
/**
* Close the Table and related objects that were initialized via
* {@link #initializeTable(Connection, TableName)}.
*
* @throws IOException
*/
protected void closeTable() throws IOException {
close(table, connection);
table = null;
connection = null;
}
private void close(Closeable... closables) throws IOException {
for (Closeable c : closables) {
if(c != null) { c.close(); }
}
}
}