blob: d2e47ada95e0418f847dd4bac3d8fd35ec51e272 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.StringLiteral;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.catalog.HBaseColumn;
import org.apache.impala.catalog.HBaseTable;
import org.apache.impala.catalog.PrimitiveType;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.THBaseFilter;
import org.apache.impala.thrift.THBaseKeyRange;
import org.apache.impala.thrift.THBaseScanNode;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TScanRange;
import org.apache.impala.thrift.TScanRangeLocation;
import org.apache.impala.thrift.TScanRangeLocationList;
import org.apache.impala.util.MembershipSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* Full scan of an HBase table.
* Only families/qualifiers specified in TupleDescriptor will be retrieved in the backend.
*/
public class HBaseScanNode extends ScanNode {
private final static Logger LOG = LoggerFactory.getLogger(HBaseScanNode.class);
private final TupleDescriptor desc_;
// One range per clustering column. The range bounds are expected to be constants.
// A null entry means there's no range restriction for that particular key.
// If keyRanges is non-null it always contains as many entries as there are clustering
// cols.
private List<ValueRange> keyRanges_;
// derived from keyRanges_; empty means unbounded;
// initialize start/stopKey_ to be unbounded.
private byte[] startKey_ = HConstants.EMPTY_START_ROW;
private byte[] stopKey_ = HConstants.EMPTY_END_ROW;
// True if this scan node is not going to scan anything. If the row key filter
// evaluates to null, or if the lower bound > upper bound, then this scan node won't
// scan at all.
private boolean isEmpty_ = false;
// List of HBase Filters for generating thrift message. Filled in finalize().
private final List<THBaseFilter> filters_ = new ArrayList<THBaseFilter>();
// The suggested value for "hbase.client.scan.setCaching", which batches maxCaching
// rows per fetch request to the HBase region server. If the value is too high,
// then the hbase region server will have a hard time (GC pressure and long response
// times). If the value is too small, then there will be extra trips to the hbase
// region server.
// Default to 1024 and update it based on row size estimate such that each batch size
// won't exceed 500MB.
private final static int MAX_HBASE_FETCH_BATCH_SIZE = 500 * 1024 * 1024;
private final static int DEFAULT_SUGGESTED_CACHING = 1024;
private int suggestedCaching_ = DEFAULT_SUGGESTED_CACHING;
public HBaseScanNode(PlanNodeId id, TupleDescriptor desc) {
super(id, desc, "SCAN HBASE");
desc_ = desc;
}
public void setKeyRanges(List<ValueRange> keyRanges) {
Preconditions.checkNotNull(keyRanges);
keyRanges_ = keyRanges;
}
@Override
public void init(Analyzer analyzer) throws ImpalaException {
checkForSupportedFileFormats();
assignConjuncts(analyzer);
conjuncts_ = orderConjunctsByCost(conjuncts_);
setStartStopKey(analyzer);
// Convert predicates to HBase filters_.
createHBaseFilters(analyzer);
// materialize slots in remaining conjuncts_
analyzer.materializeSlots(conjuncts_);
computeMemLayout(analyzer);
computeScanRangeLocations(analyzer);
// Call computeStats() after materializing slots and computing the mem layout.
computeStats(analyzer);
}
/**
* Convert keyRanges_ to startKey_ and stopKey_.
* If ValueRange is not null, transform it into start/stopKey_ by evaluating the
* expression. Analysis has checked that the expression is string type. If the
* expression evaluates to null, then there's nothing to scan because Hbase row key
* cannot be null.
* At present, we only do row key filtering for string-mapped keys. String-mapped keys
* are always encded as ascii.
* ValueRange is null if there is no predicate on the row-key.
*/
private void setStartStopKey(Analyzer analyzer) throws ImpalaException {
Preconditions.checkNotNull(keyRanges_);
Preconditions.checkState(keyRanges_.size() == 1);
ValueRange rowRange = keyRanges_.get(0);
if (rowRange != null) {
if (rowRange.getLowerBound() != null) {
Preconditions.checkState(rowRange.getLowerBound().isConstant());
Preconditions.checkState(
rowRange.getLowerBound().getType().equals(Type.STRING));
LiteralExpr val = LiteralExpr.create(rowRange.getLowerBound(),
analyzer.getQueryCtx());
if (val instanceof StringLiteral) {
StringLiteral litVal = (StringLiteral) val;
startKey_ = convertToBytes(litVal.getStringValue(),
!rowRange.getLowerBoundInclusive());
} else {
// lower bound is null.
isEmpty_ = true;
return;
}
}
if (rowRange.getUpperBound() != null) {
Preconditions.checkState(rowRange.getUpperBound().isConstant());
Preconditions.checkState(
rowRange.getUpperBound().getType().equals(Type.STRING));
LiteralExpr val = LiteralExpr.create(rowRange.getUpperBound(),
analyzer.getQueryCtx());
if (val instanceof StringLiteral) {
StringLiteral litVal = (StringLiteral) val;
stopKey_ = convertToBytes(litVal.getStringValue(),
rowRange.getUpperBoundInclusive());
} else {
// lower bound is null.
isEmpty_ = true;
return;
}
}
}
boolean endKeyIsEndOfTable = Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW);
if ((Bytes.compareTo(startKey_, stopKey_) > 0) && !endKeyIsEndOfTable) {
// Lower bound is greater than upper bound.
isEmpty_ = true;
}
}
/**
* Also sets suggestedCaching_.
*/
@Override
public void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
HBaseTable tbl = (HBaseTable) desc_.getTable();
ValueRange rowRange = keyRanges_.get(0);
if (isEmpty_) {
cardinality_ = 0;
} else if (rowRange != null && rowRange.isEqRange()) {
cardinality_ = 1;
} else {
// Set maxCaching so that each fetch from hbase won't return a batch of more than
// MAX_HBASE_FETCH_BATCH_SIZE bytes.
Pair<Long, Long> estimate = tbl.getEstimatedRowStats(startKey_, stopKey_);
cardinality_ = estimate.first.longValue();
if (estimate.second.longValue() > 0) {
suggestedCaching_ = (int)
Math.max(MAX_HBASE_FETCH_BATCH_SIZE / estimate.second.longValue(), 1);
}
}
inputCardinality_ = cardinality_;
cardinality_ *= computeSelectivity();
cardinality_ = Math.max(1, cardinality_);
cardinality_ = capAtLimit(cardinality_);
if (LOG.isTraceEnabled()) {
LOG.trace("computeStats HbaseScan: cardinality=" + Long.toString(cardinality_));
}
// Assume that each node in the cluster gets a scan range, unless there are fewer
// scan ranges than nodes.
numNodes_ = Math.max(1,
Math.min(scanRanges_.size(), MembershipSnapshot.getCluster().numNodes()));
if (LOG.isTraceEnabled()) {
LOG.trace("computeStats HbaseScan: #nodes=" + Integer.toString(numNodes_));
}
}
@Override
protected String debugString() {
HBaseTable tbl = (HBaseTable) desc_.getTable();
return Objects.toStringHelper(this)
.add("tid", desc_.getId().asInt())
.add("hiveTblName", tbl.getFullName())
.add("hbaseTblName", tbl.getHBaseTableName())
.add("startKey", ByteBuffer.wrap(startKey_).toString())
.add("stopKey", ByteBuffer.wrap(stopKey_).toString())
.add("isEmpty", isEmpty_)
.addValue(super.debugString())
.toString();
}
// We convert predicates of the form <slotref> op <constant> where slotref is of
// type string to HBase filters. All these predicates are also evaluated at
// the HBaseScanNode. To properly filter out NULL values HBaseScanNode treats all
// predicates as disjunctive, thereby requiring re-evaluation when there are multiple
// attributes. We explicitly materialize the referenced slots, otherwise our hbase
// scans don't return correct data.
// TODO: expand this to generate nested filter lists for arbitrary conjunctions
// and disjunctions.
private void createHBaseFilters(Analyzer analyzer) {
for (Expr e: conjuncts_) {
// We only consider binary predicates
if (!(e instanceof BinaryPredicate)) continue;
BinaryPredicate bp = (BinaryPredicate) e;
CompareFilter.CompareOp hbaseOp = impalaOpToHBaseOp(bp.getOp());
// Ignore unsupported ops
if (hbaseOp == null) continue;
for (SlotDescriptor slot: desc_.getSlots()) {
// Only push down predicates on string columns
if (slot.getType().getPrimitiveType() != PrimitiveType.STRING) continue;
Expr bindingExpr = bp.getSlotBinding(slot.getId());
if (bindingExpr == null || !(bindingExpr instanceof StringLiteral)) continue;
StringLiteral literal = (StringLiteral) bindingExpr;
HBaseColumn col = (HBaseColumn) slot.getColumn();
filters_.add(new THBaseFilter(
col.getColumnFamily(), col.getColumnQualifier(),
(byte) hbaseOp.ordinal(), literal.getUnescapedValue()));
analyzer.materializeSlots(Lists.newArrayList(e));
}
}
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.HBASE_SCAN_NODE;
HBaseTable tbl = (HBaseTable) desc_.getTable();
msg.hbase_scan_node =
new THBaseScanNode(desc_.getId().asInt(), tbl.getHBaseTableName());
if (!filters_.isEmpty()) {
msg.hbase_scan_node.setFilters(filters_);
}
msg.hbase_scan_node.setSuggested_max_caching(suggestedCaching_);
}
/**
* We create a TScanRange for each region server that contains at least one
* relevant region, and the created TScanRange will contain all the relevant regions
* of that region server.
*/
private void computeScanRangeLocations(Analyzer analyzer) {
scanRanges_ = Lists.newArrayList();
// For empty scan node, return an empty list.
if (isEmpty_) return;
// Retrieve relevant HBase regions and their region servers
HBaseTable tbl = (HBaseTable) desc_.getTable();
org.apache.hadoop.hbase.client.Table hbaseTbl = null;
List<HRegionLocation> regionsLoc;
try {
hbaseTbl = tbl.getHBaseTable();
regionsLoc = HBaseTable.getRegionsInRange(hbaseTbl, startKey_, stopKey_);
hbaseTbl.close();
} catch (IOException e) {
throw new RuntimeException(
"couldn't retrieve HBase table (" + tbl.getHBaseTableName() + ") info:\n"
+ e.getMessage(), e);
}
// Convert list of HRegionLocation to Map<hostport, List<HRegionLocation>>.
// The List<HRegionLocations>'s end up being sorted by start key/end key, because
// regionsLoc is sorted that way.
Map<String, List<HRegionLocation>> locationMap = Maps.newHashMap();
for (HRegionLocation regionLoc: regionsLoc) {
String locHostPort = regionLoc.getHostnamePort();
if (locationMap.containsKey(locHostPort)) {
locationMap.get(locHostPort).add(regionLoc);
} else {
locationMap.put(locHostPort, Lists.newArrayList(regionLoc));
}
}
for (Map.Entry<String, List<HRegionLocation>> locEntry: locationMap.entrySet()) {
// HBaseTableScanner(backend) initializes a result scanner for each key range.
// To minimize # of result scanner re-init, create only a single HBaseKeyRange
// for all adjacent regions on this server.
THBaseKeyRange keyRange = null;
byte[] prevEndKey = null;
for (HRegionLocation regionLoc: locEntry.getValue()) {
byte[] curRegStartKey = regionLoc.getRegionInfo().getStartKey();
byte[] curRegEndKey = regionLoc.getRegionInfo().getEndKey();
if (prevEndKey != null &&
Bytes.compareTo(prevEndKey, curRegStartKey) == 0) {
// the current region starts where the previous one left off;
// extend the key range
setKeyRangeEnd(keyRange, curRegEndKey);
} else {
// create a new HBaseKeyRange (and TScanRange2/TScanRangeLocationList to go
// with it).
keyRange = new THBaseKeyRange();
setKeyRangeStart(keyRange, curRegStartKey);
setKeyRangeEnd(keyRange, curRegEndKey);
TScanRangeLocationList scanRangeLocation = new TScanRangeLocationList();
TNetworkAddress networkAddress = addressToTNetworkAddress(locEntry.getKey());
scanRangeLocation.addToLocations(
new TScanRangeLocation(analyzer.getHostIndex().getIndex(networkAddress)));
scanRanges_.add(scanRangeLocation);
TScanRange scanRange = new TScanRange();
scanRange.setHbase_key_range(keyRange);
scanRangeLocation.setScan_range(scanRange);
}
prevEndKey = curRegEndKey;
}
}
}
/**
* Set the start key of keyRange using the provided key, bounded by startKey_
* @param keyRange the keyRange to be updated
* @param rangeStartKey the start key value to be set to
*/
private void setKeyRangeStart(THBaseKeyRange keyRange, byte[] rangeStartKey) {
keyRange.unsetStartKey();
// use the max(startKey, rangeStartKey) for scan start
if (!Bytes.equals(rangeStartKey, HConstants.EMPTY_START_ROW) ||
!Bytes.equals(startKey_, HConstants.EMPTY_START_ROW)) {
byte[] partStart = (Bytes.compareTo(rangeStartKey, startKey_) < 0) ?
startKey_ : rangeStartKey;
keyRange.setStartKey(Bytes.toString(partStart));
}
}
/**
* Set the end key of keyRange using the provided key, bounded by stopKey_
* @param keyRange the keyRange to be updated
* @param rangeEndKey the end key value to be set to
*/
private void setKeyRangeEnd(THBaseKeyRange keyRange, byte[] rangeEndKey) {
keyRange.unsetStopKey();
// use the min(stopkey, regionStopKey) for scan stop
if (!Bytes.equals(rangeEndKey, HConstants.EMPTY_END_ROW) ||
!Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
if (Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
keyRange.setStopKey(Bytes.toString(rangeEndKey));
} else if (Bytes.equals(rangeEndKey, HConstants.EMPTY_END_ROW)) {
keyRange.setStopKey(Bytes.toString(stopKey_));
} else {
byte[] partEnd = (Bytes.compareTo(rangeEndKey, stopKey_) < 0) ?
rangeEndKey : stopKey_;
keyRange.setStopKey(Bytes.toString(partEnd));
}
}
}
@Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
HBaseTable table = (HBaseTable) desc_.getTable();
StringBuilder output = new StringBuilder();
if (isEmpty_) {
output.append(prefix + "empty scan node\n");
return output.toString();
}
String aliasStr = "";
if (!table.getFullName().equalsIgnoreCase(desc_.getAlias()) &&
!table.getName().equalsIgnoreCase(desc_.getAlias())) {
aliasStr = " " + desc_.getAlias();
}
output.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(),
displayName_, table.getFullName(), aliasStr));
if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
if (!Bytes.equals(startKey_, HConstants.EMPTY_START_ROW)) {
output.append(detailPrefix + "start key: " + printKey(startKey_) + "\n");
}
if (!Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
output.append(detailPrefix + "stop key: " + printKey(stopKey_) + "\n");
}
if (!filters_.isEmpty()) {
output.append(detailPrefix + "hbase filters:");
if (filters_.size() == 1) {
THBaseFilter filter = filters_.get(0);
output.append(" " + filter.family + ":" + filter.qualifier + " " +
CompareFilter.CompareOp.values()[filter.op_ordinal].toString() + " " +
"'" + filter.filter_constant + "'");
} else {
for (int i = 0; i < filters_.size(); ++i) {
THBaseFilter filter = filters_.get(i);
output.append("\n " + filter.family + ":" + filter.qualifier + " " +
CompareFilter.CompareOp.values()[filter.op_ordinal].toString() + " " +
"'" + filter.filter_constant + "'");
}
}
output.append('\n');
}
if (!conjuncts_.isEmpty()) {
output.append(
detailPrefix + "predicates: " + getExplainString(conjuncts_) + "\n");
}
}
if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
output.append(getStatsExplainString(detailPrefix, detailLevel));
output.append("\n");
}
return output.toString();
}
/**
* Convert key into byte array and append a '\0' if 'nextKey' is true.
*/
private byte[] convertToBytes(String rowKey, boolean nextKey) {
byte[] keyBytes = Bytes.toBytes(rowKey);
if (!nextKey) {
return keyBytes;
} else {
// append \0
return Arrays.copyOf(keyBytes, keyBytes.length + 1);
}
}
/**
* Prints non-printable characters in escaped octal, otherwise outputs
* the characters.
*/
public static String printKey(byte[] key) {
StringBuilder result = new StringBuilder();
for (int i = 0; i < key.length; ++i) {
if (!Character.isISOControl(key[i])) {
result.append((char) key[i]);
} else {
result.append("\\");
result.append(Integer.toOctalString(key[i]));
}
}
return result.toString();
}
private static CompareFilter.CompareOp impalaOpToHBaseOp(
BinaryPredicate.Operator impalaOp) {
switch(impalaOp) {
case EQ: return CompareFilter.CompareOp.EQUAL;
case NE: return CompareFilter.CompareOp.NOT_EQUAL;
case GT: return CompareFilter.CompareOp.GREATER;
case GE: return CompareFilter.CompareOp.GREATER_OR_EQUAL;
case LT: return CompareFilter.CompareOp.LESS;
case LE: return CompareFilter.CompareOp.LESS_OR_EQUAL;
// TODO: Add support for pushing LIKE/REGEX down to HBase with a different Filter.
default: throw new IllegalArgumentException(
"HBase: Unsupported Impala compare operator: " + impalaOp);
}
}
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
// TODO: What's a good estimate of memory consumption?
nodeResourceProfile_ = ResourceProfile.noReservation(1024L * 1024L * 1024L);
}
/**
* Returns the per-host upper bound of memory that any number of concurrent scan nodes
* will use. Used for estimating the per-host memory requirement of queries.
*/
public static long getPerHostMemUpperBound() {
// TODO: What's a good estimate of memory consumption?
return 1024L * 1024L * 1024L;
}
@Override
public boolean hasStorageLayerConjuncts() { return !filters_.isEmpty(); }
}