blob: 1ef454572c258983337a63a27415de990d56aa6f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping;
import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HiveHBaseTableInputFormat implements InputFormat for HBase storage handler
* tables, decorating an underlying HBase TableInputFormat with extra Hive logic
* such as column pruning and filter pushdown.
*/
public class HiveHBaseTableInputFormat extends TableInputFormatBase
implements InputFormat<ImmutableBytesWritable, ResultWritable> {
static final Logger LOG = LoggerFactory.getLogger(HiveHBaseTableInputFormat.class);
private static final Object hbaseTableMonitor = new Object();
private Connection conn = null;
@Override
public RecordReader<ImmutableBytesWritable, ResultWritable> getRecordReader(
InputSplit split,
JobConf jobConf,
final Reporter reporter) throws IOException {
HBaseSplit hbaseSplit = (HBaseSplit) split;
TableSplit tableSplit = hbaseSplit.getTableSplit();
if (conn == null) {
conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf));
}
initializeTable(conn, tableSplit.getTable());
setScan(HiveHBaseInputFormatUtil.getScan(jobConf));
Job job = new Job(jobConf);
TaskAttemptContext tac = ShimLoader.getHadoopShims().newTaskAttemptContext(
job.getConfiguration(), reporter);
final org.apache.hadoop.mapreduce.RecordReader<ImmutableBytesWritable, Result>
recordReader = createRecordReader(tableSplit, tac);
try {
recordReader.initialize(tableSplit, tac);
} catch (InterruptedException e) {
closeTable(); // Free up the HTable connections
if (conn != null) {
conn.close();
conn = null;
}
throw new IOException("Failed to initialize RecordReader", e);
}
return new RecordReader<ImmutableBytesWritable, ResultWritable>() {
@Override
public void close() throws IOException {
recordReader.close();
closeTable();
if (conn != null) {
conn.close();
conn = null;
}
}
@Override
public ImmutableBytesWritable createKey() {
return new ImmutableBytesWritable();
}
@Override
public ResultWritable createValue() {
return new ResultWritable(new Result());
}
@Override
public long getPos() throws IOException {
return 0;
}
@Override
public float getProgress() throws IOException {
float progress = 0.0F;
try {
progress = recordReader.getProgress();
} catch (InterruptedException e) {
throw new IOException(e);
}
return progress;
}
@Override
public boolean next(ImmutableBytesWritable rowKey, ResultWritable value) throws IOException {
boolean next = false;
try {
next = recordReader.nextKeyValue();
if (next) {
rowKey.set(recordReader.getCurrentValue().getRow());
value.setResult(recordReader.getCurrentValue());
}
} catch (InterruptedException e) {
throw new IOException(e);
}
return next;
}
};
}
/**
* Converts a filter (which has been pushed down from Hive's optimizer)
* into corresponding restrictions on the HBase scan. The
* filter should already be in a form which can be fully converted.
*
* @param jobConf configuration for the scan
*
* @param iKey 0-based offset of key column within Hive table
*
* @return converted table split if any
*/
private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean isKeyBinary)
throws IOException {
// TODO: assert iKey is HBaseSerDe#HBASE_KEY_COL
Scan scan = new Scan();
String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR);
if (filterObjectSerialized != null) {
HBaseScanRange range = SerializationUtilities.deserializeObject(filterObjectSerialized,
HBaseScanRange.class);
try {
range.setup(scan, jobConf);
} catch (Exception e) {
throw new IOException(e);
}
return scan;
}
String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (filterExprSerialized == null) {
return scan;
}
ExprNodeGenericFuncDesc filterExpr =
SerializationUtilities.deserializeExpression(filterExprSerialized);
String keyColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey];
ArrayList<TypeInfo> cols = TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
String colType = cols.get(iKey).getTypeName();
boolean isKeyComparable = isKeyBinary || colType.equalsIgnoreCase("string");
String tsColName = null;
if (iTimestamp >= 0) {
tsColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iTimestamp];
}
IndexPredicateAnalyzer analyzer =
newIndexPredicateAnalyzer(keyColName, isKeyComparable, tsColName);
List<IndexSearchCondition> conditions = new ArrayList<IndexSearchCondition>();
ExprNodeDesc residualPredicate = analyzer.analyzePredicate(filterExpr, conditions);
// There should be no residual since we already negotiated that earlier in
// HBaseStorageHandler.decomposePredicate. However, with hive.optimize.index.filter
// OpProcFactory#pushFilterToStorageHandler pushes the original filter back down again.
// Since pushed-down filters are not omitted at the higher levels (and thus the
// contract of negotiation is ignored anyway), just ignore the residuals.
// Re-assess this when negotiation is honored and the duplicate evaluation is removed.
// THIS IGNORES RESIDUAL PARSING FROM HBaseStorageHandler#decomposePredicate
if (residualPredicate != null) {
LOG.debug("Ignoring residual predicate " + residualPredicate.getExprString());
}
Map<String, List<IndexSearchCondition>> split = HiveHBaseInputFormatUtil.decompose(conditions);
List<IndexSearchCondition> keyConditions = split.get(keyColName);
if (keyConditions != null && !keyConditions.isEmpty()) {
setupKeyRange(scan, keyConditions, isKeyBinary);
}
List<IndexSearchCondition> tsConditions = split.get(tsColName);
if (tsConditions != null && !tsConditions.isEmpty()) {
setupTimeRange(scan, tsConditions);
}
return scan;
}
private void setupKeyRange(Scan scan, List<IndexSearchCondition> conditions, boolean isBinary)
throws IOException {
// Convert the search condition into a restriction on the HBase scan
byte [] startRow = HConstants.EMPTY_START_ROW, stopRow = HConstants.EMPTY_END_ROW;
for (IndexSearchCondition sc : conditions) {
ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc());
PrimitiveObjectInspector objInspector;
Object writable;
try {
objInspector = (PrimitiveObjectInspector)eval.initialize(null);
writable = eval.evaluate(null);
} catch (ClassCastException cce) {
throw new IOException("Currently only primitve types are supported. Found: " +
sc.getConstantDesc().getTypeString());
} catch (HiveException e) {
throw new IOException(e);
}
byte[] constantVal = getConstantVal(writable, objInspector, isBinary);
String comparisonOp = sc.getComparisonOp();
if("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)){
startRow = constantVal;
stopRow = getNextBA(constantVal);
} else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan".equals(comparisonOp)){
stopRow = constantVal;
} else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan"
.equals(comparisonOp)) {
startRow = constantVal;
} else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan"
.equals(comparisonOp)){
startRow = getNextBA(constantVal);
} else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan"
.equals(comparisonOp)){
stopRow = getNextBA(constantVal);
} else {
throw new IOException(comparisonOp + " is not a supported comparison operator");
}
}
scan.setStartRow(startRow);
scan.setStopRow(stopRow);
if (LOG.isDebugEnabled()) {
LOG.debug(Bytes.toStringBinary(startRow) + " ~ " + Bytes.toStringBinary(stopRow));
}
}
private void setupTimeRange(Scan scan, List<IndexSearchCondition> conditions)
throws IOException {
long start = 0;
long end = Long.MAX_VALUE;
for (IndexSearchCondition sc : conditions) {
long timestamp = getTimestampVal(sc);
String comparisonOp = sc.getComparisonOp();
if("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)){
start = timestamp;
end = timestamp + 1;
} else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan".equals(comparisonOp)){
end = timestamp;
} else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan"
.equals(comparisonOp)) {
start = timestamp;
} else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan"
.equals(comparisonOp)){
start = timestamp + 1;
} else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan"
.equals(comparisonOp)){
end = timestamp + 1;
} else {
throw new IOException(comparisonOp + " is not a supported comparison operator");
}
}
scan.setTimeRange(start, end);
}
private long getTimestampVal(IndexSearchCondition sc) throws IOException {
long timestamp;
try {
ExprNodeConstantEvaluator eval = new ExprNodeConstantEvaluator(sc.getConstantDesc());
ObjectInspector inspector = eval.initialize(null);
Object value = eval.evaluate(null);
if (inspector instanceof LongObjectInspector) {
timestamp = ((LongObjectInspector)inspector).get(value);
} else {
PrimitiveObjectInspector primitive = (PrimitiveObjectInspector) inspector;
timestamp = PrimitiveObjectInspectorUtils.getTimestamp(value, primitive).getTime();
}
} catch (HiveException e) {
throw new IOException(e);
}
return timestamp;
}
private byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi,
boolean isKeyBinary) throws IOException{
if (!isKeyBinary){
// Key is stored in text format. Get bytes representation of constant also of
// text format.
byte[] startRow;
ByteStream.Output serializeStream = new ByteStream.Output();
LazyUtils.writePrimitiveUTF8(serializeStream, writable, poi, false, (byte) 0, null);
startRow = new byte[serializeStream.getLength()];
System.arraycopy(serializeStream.getData(), 0, startRow, 0, serializeStream.getLength());
return startRow;
}
PrimitiveCategory pc = poi.getPrimitiveCategory();
switch (poi.getPrimitiveCategory()) {
case INT:
return Bytes.toBytes(((IntWritable)writable).get());
case BOOLEAN:
return Bytes.toBytes(((BooleanWritable)writable).get());
case LONG:
return Bytes.toBytes(((LongWritable)writable).get());
case FLOAT:
return Bytes.toBytes(((FloatWritable)writable).get());
case DOUBLE:
return Bytes.toBytes(((DoubleWritable)writable).get());
case SHORT:
return Bytes.toBytes(((ShortWritable)writable).get());
case STRING:
return Bytes.toBytes(((Text)writable).toString());
case BYTE:
return Bytes.toBytes(((ByteWritable)writable).get());
default:
throw new IOException("Type not supported " + pc);
}
}
private byte[] getNextBA(byte[] current){
// startRow is inclusive while stopRow is exclusive,
// this util method returns very next bytearray which will occur after the current one
// by padding current one with a trailing 0 byte.
byte[] next = new byte[current.length + 1];
System.arraycopy(current, 0, next, 0, current.length);
return next;
}
/**
* Instantiates a new predicate analyzer suitable for
* determining how to push a filter down into the HBase scan,
* based on the rules for what kinds of pushdown we currently support.
*
* @param keyColumnName name of the Hive column mapped to the HBase row key
*
* @return preconfigured predicate analyzer
*/
static IndexPredicateAnalyzer newIndexPredicateAnalyzer(
String keyColumnName, boolean isKeyComparable, String timestampColumn) {
IndexPredicateAnalyzer analyzer = new IndexPredicateAnalyzer();
// We can always do equality predicate. Just need to make sure we get appropriate
// BA representation of constant of filter condition.
// We can do other comparisons only if storage format in hbase is either binary
// or we are dealing with string types since there lexicographic ordering will suffice.
if (isKeyComparable) {
analyzer.addComparisonOp(keyColumnName,
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan");
} else {
analyzer.addComparisonOp(keyColumnName,
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual");
}
if (timestampColumn != null) {
analyzer.addComparisonOp(timestampColumn,
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan",
"org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan");
}
return analyzer;
}
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
synchronized (hbaseTableMonitor) {
return getSplitsInternal(jobConf, numSplits);
}
}
private InputSplit[] getSplitsInternal(JobConf jobConf, int numSplits) throws IOException {
//obtain delegation tokens for the job
if (UserGroupInformation.getCurrentUser().hasKerberosCredentials()) {
TableMapReduceUtil.initCredentials(jobConf);
}
String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
if (conn == null) {
conn = ConnectionFactory.createConnection(HBaseConfiguration.create(jobConf));
}
TableName tableName = TableName.valueOf(hbaseTableName);
initializeTable(conn, tableName);
String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true);
try {
if (hbaseColumnsMapping == null) {
throw new IOException(HBaseSerDe.HBASE_COLUMNS_MAPPING + " required for HBase Table.");
}
ColumnMappings columnMappings = null;
try {
columnMappings = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching);
} catch (SerDeException e) {
throw new IOException(e);
}
int iKey = columnMappings.getKeyIndex();
int iTimestamp = columnMappings.getTimestampIndex();
ColumnMapping keyMapping = columnMappings.getKeyMapping();
// Take filter pushdown into account while calculating splits; this
// allows us to prune off regions immediately. Note that although
// the Javadoc for the superclass getSplits says that it returns one
// split per region, the implementation actually takes the scan
// definition into account and excludes regions which don't satisfy
// the start/stop row conditions (HBASE-1829).
Scan scan = createFilterScan(jobConf, iKey, iTimestamp,
HiveHBaseInputFormatUtil.getStorageFormatOfKey(keyMapping.mappingSpec,
jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string")));
// The list of families that have been added to the scan
List<String> addedFamilies = new ArrayList<String>();
// REVIEW: are we supposed to be applying the getReadColumnIDs
// same as in getRecordReader?
for (ColumnMapping colMap : columnMappings) {
if (colMap.hbaseRowKey || colMap.hbaseTimestamp) {
continue;
}
if (colMap.qualifierName == null) {
scan.addFamily(colMap.familyNameBytes);
addedFamilies.add(colMap.familyName);
} else {
if(!addedFamilies.contains(colMap.familyName)){
// add the column only if the family has not already been added
scan.addColumn(colMap.familyNameBytes, colMap.qualifierNameBytes);
}
}
}
setScan(scan);
Job job = new Job(jobConf);
JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);
List<org.apache.hadoop.mapreduce.InputSplit> splits =
super.getSplits(jobContext);
InputSplit [] results = new InputSplit[splits.size()];
for (int i = 0; i < splits.size(); i++) {
results[i] = new HBaseSplit((TableSplit) splits.get(i), tablePaths[0]);
}
return results;
} finally {
closeTable();
if (conn != null) {
conn.close();
conn = null;
}
}
}
@Override
protected void finalize() throws Throwable {
try {
closeTable();
if (conn != null) {
conn.close();
conn = null;
}
} finally {
super.finalize();
}
}
}