| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.mapred; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.hbase.classification.InterfaceAudience; |
| import org.apache.hadoop.hbase.classification.InterfaceStability; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.RegionLocator; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.filter.Filter; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| import org.apache.hadoop.mapred.InputFormat; |
| import org.apache.hadoop.mapred.InputSplit; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.RecordReader; |
| import org.apache.hadoop.mapred.Reporter; |
| |
| /** |
| * A Base for {@link TableInputFormat}s. Receives a {@link Table}, a |
| * byte[] of input columns and optionally a {@link Filter}. |
| * Subclasses may use other TableRecordReader implementations. |
| * |
| * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to |
| * function properly. Each of the entry points to this class used by the MapReduce framework, |
| * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, |
| * will call {@link #initialize(JobConf)} as a convenient centralized location to handle |
| * retrieving the necessary configuration information. If your subclass overrides either of these |
| * methods, either call the parent version or call initialize yourself. |
| * |
| * <p> |
| * An example of a subclass: |
| * <pre> |
| * class ExampleTIF extends TableInputFormatBase { |
| * |
| * {@literal @}Override |
| * protected void initialize(JobConf context) throws IOException { |
| * // We are responsible for the lifecycle of this connection until we hand it over in |
| * // initializeTable. |
| * Connection connection = |
| * ConnectionFactory.createConnection(HBaseConfiguration.create(job)); |
| * TableName tableName = TableName.valueOf("exampleTable"); |
| * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. |
| * initializeTable(connection, tableName); |
| * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), |
| * Bytes.toBytes("columnB") }; |
| * // mandatory |
| * setInputColumns(inputColumns); |
| * // optional, by default we'll get everything for the given columns. |
| * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); |
| * setRowFilter(exampleFilter); |
| * } |
| * } |
| * </pre> |
| */ |
| |
| @InterfaceAudience.Public |
| @InterfaceStability.Stable |
| public abstract class TableInputFormatBase |
| implements InputFormat<ImmutableBytesWritable, Result> { |
| private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class); |
| private byte [][] inputColumns; |
| private Table table; |
| private RegionLocator regionLocator; |
| private Connection connection; |
| private TableRecordReader tableRecordReader; |
| private Filter rowFilter; |
| |
| private static final String NOT_INITIALIZED = "The input format instance has not been properly " + |
| "initialized. Ensure you call initializeTable either in your constructor or initialize " + |
| "method"; |
| private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + |
| " previous error. Please look at the previous logs lines from" + |
| " the task's full log for more details."; |
| |
| /** |
| * Builds a TableRecordReader. If no TableRecordReader was provided, uses |
| * the default. |
| * |
| * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, |
| * JobConf, Reporter) |
| */ |
| public RecordReader<ImmutableBytesWritable, Result> getRecordReader( |
| InputSplit split, JobConf job, Reporter reporter) |
| throws IOException { |
| // In case a subclass uses the deprecated approach or calls initializeTable directly |
| if (table == null) { |
| initialize(job); |
| } |
| // null check in case our child overrides getTable to not throw. |
| try { |
| if (getTable() == null) { |
| // initialize() must not have been implemented in the subclass. |
| throw new IOException(INITIALIZATION_ERROR); |
| } |
| } catch (IllegalStateException exception) { |
| throw new IOException(INITIALIZATION_ERROR, exception); |
| } |
| |
| TableSplit tSplit = (TableSplit) split; |
| // if no table record reader was provided use default |
| final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() : |
| this.tableRecordReader; |
| trr.setStartRow(tSplit.getStartRow()); |
| trr.setEndRow(tSplit.getEndRow()); |
| trr.setHTable(this.table); |
| trr.setInputColumns(this.inputColumns); |
| trr.setRowFilter(this.rowFilter); |
| trr.init(); |
| return new RecordReader<ImmutableBytesWritable, Result>() { |
| |
| @Override |
| public void close() throws IOException { |
| trr.close(); |
| closeTable(); |
| } |
| |
| @Override |
| public ImmutableBytesWritable createKey() { |
| return trr.createKey(); |
| } |
| |
| @Override |
| public Result createValue() { |
| return trr.createValue(); |
| } |
| |
| @Override |
| public long getPos() throws IOException { |
| return trr.getPos(); |
| } |
| |
| @Override |
| public float getProgress() throws IOException { |
| return trr.getProgress(); |
| } |
| |
| @Override |
| public boolean next(ImmutableBytesWritable key, Result value) throws IOException { |
| return trr.next(key, value); |
| } |
| }; |
| } |
| |
| /** |
| * Calculates the splits that will serve as input for the map tasks. |
| * |
| * Splits are created in number equal to the smallest between numSplits and |
| * the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table. |
| * If the number of splits is smaller than the number of |
| * {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits are spanned across |
| * multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s |
| * and are grouped the most evenly possible. In the |
| * case splits are uneven the bigger splits are placed first in the |
| * {@link InputSplit} array. |
| * |
| * @param job the map task {@link JobConf} |
| * @param numSplits a hint to calculate the number of splits (mapred.map.tasks). |
| * |
| * @return the input splits |
| * |
| * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int) |
| */ |
| public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { |
| if (this.table == null) { |
| initialize(job); |
| } |
| // null check in case our child overrides getTable to not throw. |
| try { |
| if (getTable() == null) { |
| // initialize() must not have been implemented in the subclass. |
| throw new IOException(INITIALIZATION_ERROR); |
| } |
| } catch (IllegalStateException exception) { |
| throw new IOException(INITIALIZATION_ERROR, exception); |
| } |
| |
| byte [][] startKeys = this.regionLocator.getStartKeys(); |
| if (startKeys == null || startKeys.length == 0) { |
| throw new IOException("Expecting at least one region"); |
| } |
| if (this.inputColumns == null || this.inputColumns.length == 0) { |
| throw new IOException("Expecting at least one column"); |
| } |
| int realNumSplits = numSplits > startKeys.length? startKeys.length: |
| numSplits; |
| InputSplit[] splits = new InputSplit[realNumSplits]; |
| int middle = startKeys.length / realNumSplits; |
| int startPos = 0; |
| for (int i = 0; i < realNumSplits; i++) { |
| int lastPos = startPos + middle; |
| lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; |
| String regionLocation = regionLocator.getRegionLocation(startKeys[startPos]). |
| getHostname(); |
| splits[i] = new TableSplit(this.table.getName(), |
| startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]: |
| HConstants.EMPTY_START_ROW, regionLocation); |
| LOG.info("split: " + i + "->" + splits[i]); |
| startPos = lastPos; |
| } |
| return splits; |
| } |
| |
| /** |
| * Allows subclasses to initialize the table information. |
| * |
| * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. |
| * @param tableName The {@link TableName} of the table to process. |
| * @throws IOException |
| */ |
| protected void initializeTable(Connection connection, TableName tableName) throws IOException { |
| if (this.table != null || this.connection != null) { |
| LOG.warn("initializeTable called multiple times. Overwriting connection and table " + |
| "reference; TableInputFormatBase will not close these old references when done."); |
| } |
| this.table = connection.getTable(tableName); |
| this.regionLocator = connection.getRegionLocator(tableName); |
| this.connection = connection; |
| } |
| |
| /** |
| * @param inputColumns to be passed in {@link Result} to the map task. |
| */ |
| protected void setInputColumns(byte [][] inputColumns) { |
| this.inputColumns = inputColumns; |
| } |
| |
| /** |
| * Allows subclasses to get the {@link Table}. |
| */ |
| protected Table getTable() { |
| if (table == null) { |
| throw new IllegalStateException(NOT_INITIALIZED); |
| } |
| return this.table; |
| } |
| |
| /** |
| * Allows subclasses to set the {@link TableRecordReader}. |
| * |
| * @param tableRecordReader |
| * to provide other {@link TableRecordReader} implementations. |
| */ |
| protected void setTableRecordReader(TableRecordReader tableRecordReader) { |
| this.tableRecordReader = tableRecordReader; |
| } |
| |
| /** |
| * Allows subclasses to set the {@link Filter} to be used. |
| * |
| * @param rowFilter |
| */ |
| protected void setRowFilter(Filter rowFilter) { |
| this.rowFilter = rowFilter; |
| } |
| |
| /** |
| * Handle subclass specific set up. |
| * Each of the entry points used by the MapReduce framework, |
| * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, |
| * will call {@link #initialize(JobConf)} as a convenient centralized location to handle |
| * retrieving the necessary configuration information and calling |
| * {@link #initializeTable(Connection, TableName)}. |
| * |
| * Subclasses should implement their initialize call such that it is safe to call multiple times. |
| * The current TableInputFormatBase implementation relies on a non-null table reference to decide |
| * if an initialize call is needed, but this behavior may change in the future. In particular, |
| * it is critical that initializeTable not be called multiple times since this will leak |
| * Connection instances. |
| * |
| */ |
| protected void initialize(JobConf job) throws IOException { |
| } |
| |
| /** |
| * Close the Table and related objects that were initialized via |
| * {@link #initializeTable(Connection, TableName)}. |
| * |
| * @throws IOException |
| */ |
| protected void closeTable() throws IOException { |
| close(table, connection); |
| table = null; |
| connection = null; |
| } |
| |
| private void close(Closeable... closables) throws IOException { |
| for (Closeable c : closables) { |
| if(c != null) { c.close(); } |
| } |
| } |
| } |