blob: c7fadc75c82ce5ac1e98c5b43faebabaeee35a88 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.StringLiteral;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.catalog.FeHBaseTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.HBaseColumn;
import org.apache.impala.catalog.PrimitiveType;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.THBaseFilter;
import org.apache.impala.thrift.THBaseKeyRange;
import org.apache.impala.thrift.THBaseScanNode;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TScanRange;
import org.apache.impala.thrift.TScanRangeLocation;
import org.apache.impala.thrift.TScanRangeLocationList;
import org.apache.impala.thrift.TScanRangeSpec;
import org.apache.impala.util.BitUtil;
import org.apache.impala.util.ExecutorMembershipSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* Full scan of an HBase table.
* Only families/qualifiers specified in TupleDescriptor will be retrieved in the backend.
*/
public class HBaseScanNode extends ScanNode {
// The suggested value for "hbase.client.scan.setCaching", which batches maxCaching
// rows per fetch request to the HBase region server. If the value is too high,
// then the hbase region server will have a hard time (GC pressure and long response
// times). If the value is too small, then there will be extra trips to the hbase
// region server.
// Default to 1024 and update it based on row size estimate such that each batch size
// won't exceed 500MB.
private final static int MAX_HBASE_FETCH_BATCH_SIZE = 500 * 1024 * 1024;
private final static int DEFAULT_SUGGESTED_CACHING = 1024;
// Used for memory estimation when the column max size stat is missing (happens only
// in case of string type columns).
private final static int DEFAULT_STRING_COL_BYTES = 32 * 1024;
// Used for memory estimation to clamp the max estimate to 128 MB in case of
// missing stats.
private final static int DEFAULT_MAX_ESTIMATE_BYTES = 128 * 1024 * 1024;
// Used for memory estimation to clamp the min estimate to 4 KB which is min
// block size that can be allocated by the mem-pool.
private final static int DEFAULT_MIN_ESTIMATE_BYTES = 4 * 1024;
private final static Logger LOG = LoggerFactory.getLogger(HBaseScanNode.class);
private final TupleDescriptor desc_;
// One range per clustering column. The range bounds are expected to be constants.
// A null entry means there's no range restriction for that particular key.
// If keyRanges is non-null it always contains as many entries as there are clustering
// cols. Don't replace this variable after init().
private List<ValueRange> keyRanges_ = new ArrayList<>();
// The list of conjuncts used to create the key ranges. Used if we must estimate
// cardinality based on row count stats. Don't replace this variable after init().
private List<Expr> keyConjuncts_ = new ArrayList<>();
// derived from keyRanges_; empty means unbounded;
// initialize start/stopKey_ to be unbounded.
private byte[] startKey_ = HConstants.EMPTY_START_ROW;
private byte[] stopKey_ = HConstants.EMPTY_END_ROW;
// True if this scan node is not going to scan anything. If the row key filter
// evaluates to null, or if the lower bound > upper bound, then this scan node won't
// scan at all.
private boolean isEmpty_ = false;
// List of HBase Filters for generating thrift message. Filled in finalize().
private final List<THBaseFilter> filters_ = new ArrayList<>();
private int suggestedCaching_ = DEFAULT_SUGGESTED_CACHING;
public HBaseScanNode(PlanNodeId id, TupleDescriptor desc) {
super(id, desc, "SCAN HBASE");
desc_ = desc;
}
@Override
public void init(Analyzer analyzer) throws ImpalaException {
FeTable table = desc_.getTable();
// determine scan predicates for clustering cols
for (int i = 0; i < table.getNumClusteringCols(); ++i) {
SlotDescriptor slotDesc = analyzer.getColumnSlot(
desc_, table.getColumns().get(i));
if (slotDesc == null || !slotDesc.getType().isStringType()) {
// the hbase row key is mapped to a non-string type
// (since it's stored in ASCII it will be lexicographically ordered,
// and non-string comparisons won't work)
keyRanges_.add(null);
} else {
keyRanges_.add(createHBaseValueRange(slotDesc));
}
}
checkForSupportedFileFormats();
assignConjuncts(analyzer);
conjuncts_ = orderConjunctsByCost(conjuncts_);
setStartStopKey(analyzer);
// Convert predicates to HBase filters_.
createHBaseFilters(analyzer);
// materialize slots in remaining conjuncts_
analyzer.materializeSlots(conjuncts_);
computeMemLayout(analyzer);
computeScanRangeLocations(analyzer);
Preconditions.checkState(!scanRangeSpecs_.isSetSplit_specs());
// Make sure key ranges are not changed any more. So startKey_ and stopKey_ are
// stable too. These invariants make it safe to reuse row estimation results in
// computeStats.
keyRanges_ = Collections.unmodifiableList(keyRanges_);
keyConjuncts_ = Collections.unmodifiableList(keyConjuncts_);
// Call computeStats() after materializing slots and computing the mem layout.
computeStats(analyzer);
}
/**
* Transform '=', '<[=]' and '>[=]' comparisons for given slot into
* ValueRange. Also removes those predicates which were used for the construction
* of ValueRange from 'conjuncts_'. Only looks at comparisons w/ string constants
* (ie, the bounds of the result can be evaluated with Expr::GetValue(NULL)).
* HBase row key filtering works only if the row key is mapped to a string column and
* the expression is a string constant expression.
* If there are multiple competing comparison predicates that could be used
* to construct a ValueRange, only the first one from each category is chosen.
*/
private ValueRange createHBaseValueRange(SlotDescriptor d) {
ListIterator<Expr> i = conjuncts_.listIterator();
ValueRange result = null;
while (i.hasNext()) {
Expr e = i.next();
if (!(e instanceof BinaryPredicate)) continue;
BinaryPredicate comp = (BinaryPredicate) e;
if ((comp.getOp() == BinaryPredicate.Operator.NE)
|| (comp.getOp() == BinaryPredicate.Operator.DISTINCT_FROM)
|| (comp.getOp() == BinaryPredicate.Operator.NOT_DISTINCT)) {
continue;
}
Expr slotBinding = comp.getSlotBinding(d.getId());
if (slotBinding == null || !slotBinding.isConstant() ||
!slotBinding.getType().equals(Type.STRING)) {
continue;
}
if (comp.getOp() == BinaryPredicate.Operator.EQ) {
i.remove();
keyConjuncts_.add(e);
return ValueRange.createEqRange(slotBinding);
}
if (result == null) result = new ValueRange();
// TODO: do we need copies here?
if (comp.getOp() == BinaryPredicate.Operator.GT
|| comp.getOp() == BinaryPredicate.Operator.GE) {
if (result.getLowerBound() == null) {
result.setLowerBound(slotBinding);
result.setLowerBoundInclusive(comp.getOp() == BinaryPredicate.Operator.GE);
i.remove();
keyConjuncts_.add(e);
}
} else {
if (result.getUpperBound() == null) {
result.setUpperBound(slotBinding);
result.setUpperBoundInclusive(comp.getOp() == BinaryPredicate.Operator.LE);
i.remove();
keyConjuncts_.add(e);
}
}
}
return result;
}
/**
* Convert keyRanges_ to startKey_ and stopKey_.
* If ValueRange is not null, transform it into start/stopKey_ by evaluating the
* expression. Analysis has checked that the expression is string type. If the
* expression evaluates to null, then there's nothing to scan because Hbase row key
* cannot be null.
* At present, we only do row key filtering for string-mapped keys. String-mapped keys
* are always encoded as ASCII.
* ValueRange is null if there is no predicate on the row-key.
*/
private void setStartStopKey(Analyzer analyzer) throws ImpalaException {
Preconditions.checkNotNull(keyRanges_);
Preconditions.checkState(keyRanges_.size() == 1);
ValueRange rowRange = keyRanges_.get(0);
if (rowRange != null) {
if (rowRange.getLowerBound() != null) {
Preconditions.checkState(rowRange.getLowerBound().isConstant());
Preconditions.checkState(
rowRange.getLowerBound().getType().equals(Type.STRING));
LiteralExpr val = LiteralExpr.createBounded(rowRange.getLowerBound(),
analyzer.getQueryCtx(), StringLiteral.MAX_STRING_LEN);
// TODO: Make this a Preconditions.checkState(). If we get here,
// and the value is not a string literal, then we've got a predicate
// that we removed from the conjunct list, but which we won't evaluate
// as a key. That is, we'll produce wrong query results.
if (val instanceof StringLiteral) {
StringLiteral litVal = (StringLiteral) val;
startKey_ = convertToBytes(litVal.getUnescapedValue(),
!rowRange.getLowerBoundInclusive());
} else {
// lower bound is null.
isEmpty_ = true;
return;
}
}
if (rowRange.getUpperBound() != null) {
Preconditions.checkState(rowRange.getUpperBound().isConstant());
Preconditions.checkState(
rowRange.getUpperBound().getType().equals(Type.STRING));
LiteralExpr val = LiteralExpr.createBounded(rowRange.getUpperBound(),
analyzer.getQueryCtx(), StringLiteral.MAX_STRING_LEN);
if (val instanceof StringLiteral) {
StringLiteral litVal = (StringLiteral) val;
stopKey_ = convertToBytes(litVal.getUnescapedValue(),
rowRange.getUpperBoundInclusive());
} else {
// lower bound is null.
isEmpty_ = true;
return;
}
}
}
boolean endKeyIsEndOfTable = Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW);
if ((Bytes.compareTo(startKey_, stopKey_) > 0) && !endKeyIsEndOfTable) {
// Lower bound is greater than upper bound.
isEmpty_ = true;
}
}
/**
* Also sets suggestedCaching_.
*/
@Override
public void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
FeHBaseTable tbl = (FeHBaseTable) desc_.getTable();
if (LOG.isTraceEnabled()) {
LOG.trace("computing stats for HbaseScan on " + tbl.getHBaseTableName());
}
ValueRange rowRange = keyRanges_.get(0);
if (isEmpty_) {
cardinality_ = 0;
} else if (rowRange != null && rowRange.isEqRange()) {
cardinality_ = 1;
} else if (inputCardinality_ >= 0) {
// We have run computeStats successfully. Don't need to estimate cardinality again
// (IMPALA-8912). Check some invariants if computeStats has been called.
Preconditions.checkState(numNodes_ > 0);
Preconditions.checkState(cardinality_ >= 0);
cardinality_ = inputCardinality_;
if (LOG.isTraceEnabled()) {
LOG.trace("Reuse last stats: inputCardinality_=" + inputCardinality_);
}
} else {
Pair<Long, Long> estimate;
if (analyzer.getQueryOptions().isDisable_hbase_num_rows_estimate()) {
estimate = new Pair<>(-1L, -1L);
} else {
// Set maxCaching so that each fetch from hbase won't return a batch of more than
// MAX_HBASE_FETCH_BATCH_SIZE bytes.
// May return -1 for the estimate if insufficient data is available.
estimate = tbl.getEstimatedRowStats(startKey_, stopKey_);
}
if (estimate.first == -1) {
// No useful estimate. Rely on HMS row count stats.
// This works only if HBase stats are available in HMS. This is true
// for the Impala tests, and may be true for some applications.
cardinality_ = tbl.getTTableStats().getNum_rows();
if (LOG.isTraceEnabled()) {
LOG.trace("Fallback to use table stats in HMS: num_rows=" + cardinality_);
}
// TODO: What do do if neither HBase nor HMS provide a row count estimate?
// Is there some third, ulitimate fallback?
// Apply estimated key range selectivity from original key conjuncts
if (cardinality_ > 0 && keyConjuncts_ != null) {
cardinality_ =
applySelectivity(cardinality_, computeCombinedSelectivity(keyConjuncts_));
}
} else {
// Use the HBase sampling scan to estimate cardinality. Note that,
// in tests, this estimate has proven to be very rough: off by
// 2x or more.
cardinality_ = estimate.first;
if (estimate.second > 0) {
suggestedCaching_ = (int)
Math.max(MAX_HBASE_FETCH_BATCH_SIZE / estimate.second, 1);
}
}
}
inputCardinality_ = cardinality_;
if (cardinality_ > 0) {
cardinality_ = applyConjunctsSelectivity(cardinality_);
} else {
// Safe guard for cardinality_ < -1, e.g. when hbase sampling fails and numRows
// in HMS is abnormally set to be < -1.
cardinality_ = Math.max(-1, cardinality_);
}
cardinality_ = capCardinalityAtLimit(cardinality_);
if (LOG.isTraceEnabled()) {
LOG.trace("computeStats HbaseScan: cardinality=" + cardinality_);
}
// Assume that each executor in the cluster gets a scan range, unless there are fewer
// scan ranges than nodes.
numNodes_ = Math.max(1, Math.min(scanRangeSpecs_.getConcrete_rangesSize(),
ExecutorMembershipSnapshot.getCluster().numExecutors()));
if (LOG.isTraceEnabled()) {
LOG.trace("computeStats HbaseScan: #nodes=" + numNodes_);
}
}
@Override
protected String debugString() {
FeHBaseTable tbl = (FeHBaseTable) desc_.getTable();
return Objects.toStringHelper(this)
.add("tid", desc_.getId().asInt())
.add("hiveTblName", tbl.getFullName())
.add("hbaseTblName", tbl.getHBaseTableName())
.add("startKey", ByteBuffer.wrap(startKey_).toString())
.add("stopKey", ByteBuffer.wrap(stopKey_).toString())
.add("isEmpty", isEmpty_)
.addValue(super.debugString())
.toString();
}
// We convert predicates of the form <slotref> op <constant> where slotref is of
// type string to HBase filters. All these predicates are also evaluated at
// the HBaseScanNode. To properly filter out NULL values HBaseScanNode treats all
// predicates as disjunctive, thereby requiring re-evaluation when there are multiple
// attributes. We explicitly materialize the referenced slots, otherwise our hbase
// scans don't return correct data.
// TODO: expand this to generate nested filter lists for arbitrary conjunctions
// and disjunctions.
private void createHBaseFilters(Analyzer analyzer) {
for (Expr e: conjuncts_) {
// We only consider binary predicates
if (!(e instanceof BinaryPredicate)) continue;
BinaryPredicate bp = (BinaryPredicate) e;
CompareFilter.CompareOp hbaseOp = impalaOpToHBaseOp(bp.getOp());
// Ignore unsupported ops
if (hbaseOp == null) continue;
for (SlotDescriptor slot: desc_.getSlots()) {
// Only push down predicates on string columns
if (slot.getType().getPrimitiveType() != PrimitiveType.STRING) continue;
Expr bindingExpr = bp.getSlotBinding(slot.getId());
if (bindingExpr == null || !(bindingExpr instanceof StringLiteral)) continue;
StringLiteral literal = (StringLiteral) bindingExpr;
HBaseColumn col = (HBaseColumn) slot.getColumn();
// IMPALA-7929: Since the qualifier can be null (e.g. for the key column of an
// HBase table), the qualifier field must be optional in order to express the
// null value. Constructors in Thrift do not set optional fields, so the qualifier
// must be set separately.
THBaseFilter thbf = new THBaseFilter(col.getColumnFamily(),
(byte) hbaseOp.ordinal(), literal.getUnescapedValue());
thbf.setQualifier(col.getColumnQualifier());
filters_.add(thbf);
analyzer.materializeSlots(Lists.newArrayList(e));
}
}
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.HBASE_SCAN_NODE;
FeHBaseTable tbl = (FeHBaseTable) desc_.getTable();
msg.hbase_scan_node =
new THBaseScanNode(desc_.getId().asInt(), tbl.getHBaseTableName());
if (!filters_.isEmpty()) {
msg.hbase_scan_node.setFilters(filters_);
}
msg.hbase_scan_node.setSuggested_max_caching(suggestedCaching_);
}
/**
* We create a TScanRange for each region server that contains at least one
* relevant region, and the created TScanRange will contain all the relevant regions
* of that region server.
*/
private void computeScanRangeLocations(Analyzer analyzer) {
scanRangeSpecs_ = new TScanRangeSpec();
// For empty scan node, return an empty list.
if (isEmpty_) return;
// Retrieve relevant HBase regions and their region servers
FeHBaseTable tbl = (FeHBaseTable) desc_.getTable();
List<HRegionLocation> regionsLoc;
try {
regionsLoc = FeHBaseTable.Util.getRegionsInRange(tbl, startKey_, stopKey_);
} catch (IOException e) {
throw new RuntimeException(
"couldn't retrieve HBase table (" + tbl.getHBaseTableName() + ") info:\n"
+ e.getMessage(), e);
}
// Convert list of HRegionLocation to Map<hostport, List<HRegionLocation>>.
// The List<HRegionLocations>'s end up being sorted by start key/end key, because
// regionsLoc is sorted that way.
Map<String, List<HRegionLocation>> locationMap = new HashMap<>();
for (HRegionLocation regionLoc: regionsLoc) {
String locHostPort = regionLoc.getHostnamePort();
if (locationMap.containsKey(locHostPort)) {
locationMap.get(locHostPort).add(regionLoc);
} else {
locationMap.put(locHostPort, Lists.newArrayList(regionLoc));
}
}
for (Map.Entry<String, List<HRegionLocation>> locEntry: locationMap.entrySet()) {
// HBaseTableScanner(backend) initializes a result scanner for each key range.
// To minimize # of result scanner re-init, create only a single HBaseKeyRange
// for all adjacent regions on this server.
THBaseKeyRange keyRange = null;
byte[] prevEndKey = null;
for (HRegionLocation regionLoc: locEntry.getValue()) {
byte[] curRegStartKey = regionLoc.getRegionInfo().getStartKey();
byte[] curRegEndKey = regionLoc.getRegionInfo().getEndKey();
if (prevEndKey != null &&
Bytes.compareTo(prevEndKey, curRegStartKey) == 0) {
// the current region starts where the previous one left off;
// extend the key range
setKeyRangeEnd(keyRange, curRegEndKey);
} else {
// create a new HBaseKeyRange (and TScanRange2/TScanRangeLocationList to go
// with it).
keyRange = new THBaseKeyRange();
setKeyRangeStart(keyRange, curRegStartKey);
setKeyRangeEnd(keyRange, curRegEndKey);
TScanRangeLocationList scanRangeLocation = new TScanRangeLocationList();
TNetworkAddress networkAddress = addressToTNetworkAddress(locEntry.getKey());
scanRangeLocation.addToLocations(
new TScanRangeLocation(analyzer.getHostIndex().getIndex(networkAddress)));
TScanRange scanRange = new TScanRange();
scanRange.setHbase_key_range(keyRange);
scanRangeLocation.setScan_range(scanRange);
scanRangeSpecs_.addToConcrete_ranges(scanRangeLocation);
}
prevEndKey = curRegEndKey;
}
}
}
/**
* Set the start key of keyRange using the provided key, bounded by startKey_
* @param keyRange the keyRange to be updated
* @param rangeStartKey the start key value to be set to
*/
private void setKeyRangeStart(THBaseKeyRange keyRange, byte[] rangeStartKey) {
keyRange.unsetStartKey();
// use the max(startKey, rangeStartKey) for scan start
if (!Bytes.equals(rangeStartKey, HConstants.EMPTY_START_ROW) ||
!Bytes.equals(startKey_, HConstants.EMPTY_START_ROW)) {
byte[] partStart = (Bytes.compareTo(rangeStartKey, startKey_) < 0) ?
startKey_ : rangeStartKey;
keyRange.setStartKey(Bytes.toString(partStart));
}
}
/**
* Set the end key of keyRange using the provided key, bounded by stopKey_
* @param keyRange the keyRange to be updated
* @param rangeEndKey the end key value to be set to
*/
private void setKeyRangeEnd(THBaseKeyRange keyRange, byte[] rangeEndKey) {
keyRange.unsetStopKey();
// use the min(stopkey, regionStopKey) for scan stop
if (!Bytes.equals(rangeEndKey, HConstants.EMPTY_END_ROW) ||
!Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
if (Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
keyRange.setStopKey(Bytes.toString(rangeEndKey));
} else if (Bytes.equals(rangeEndKey, HConstants.EMPTY_END_ROW)) {
keyRange.setStopKey(Bytes.toString(stopKey_));
} else {
byte[] partEnd = (Bytes.compareTo(rangeEndKey, stopKey_) < 0) ?
rangeEndKey : stopKey_;
keyRange.setStopKey(Bytes.toString(partEnd));
}
}
}
@Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
FeHBaseTable table = (FeHBaseTable) desc_.getTable();
StringBuilder output = new StringBuilder();
if (isEmpty_) {
output.append(prefix + "empty scan node\n");
return output.toString();
}
String aliasStr = "";
if (!table.getFullName().equalsIgnoreCase(desc_.getAlias()) &&
!table.getName().equalsIgnoreCase(desc_.getAlias())) {
aliasStr = " " + desc_.getAlias();
}
output.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(),
displayName_, table.getFullName(), aliasStr));
if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
if (!keyConjuncts_.isEmpty()) {
output.append(detailPrefix + "key predicates: " +
Expr.getExplainString(keyConjuncts_, detailLevel) + "\n");
}
if (!Bytes.equals(startKey_, HConstants.EMPTY_START_ROW)) {
output.append(detailPrefix + "start key: " + printKey(startKey_) + "\n");
}
if (!Bytes.equals(stopKey_, HConstants.EMPTY_END_ROW)) {
output.append(detailPrefix + "stop key: " + printKey(stopKey_) + "\n");
}
if (!filters_.isEmpty()) {
output.append(detailPrefix + "hbase filters:");
if (filters_.size() == 1) {
THBaseFilter filter = filters_.get(0);
output.append(" " + filter.family + ":" + filter.qualifier + " " +
CompareFilter.CompareOp.values()[filter.op_ordinal].toString() + " " +
"'" + filter.filter_constant + "'");
} else {
for (int i = 0; i < filters_.size(); ++i) {
THBaseFilter filter = filters_.get(i);
output.append("\n " + filter.family + ":" + filter.qualifier + " " +
CompareFilter.CompareOp.values()[filter.op_ordinal].toString() + " " +
"'" + filter.filter_constant + "'");
}
}
output.append('\n');
}
if (!conjuncts_.isEmpty()) {
output.append(detailPrefix
+ "predicates: " + Expr.getExplainString(conjuncts_, detailLevel) + "\n");
}
}
if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
output.append(getStatsExplainString(detailPrefix));
output.append("\n");
}
return output.toString();
}
/**
* Convert key into byte array and append a '\0' if 'nextKey' is true.
*/
private byte[] convertToBytes(String rowKey, boolean nextKey) {
byte[] keyBytes = Bytes.toBytes(rowKey);
if (!nextKey) {
return keyBytes;
} else {
// append \0
return Arrays.copyOf(keyBytes, keyBytes.length + 1);
}
}
/**
* Prints non-printable characters in escaped octal, otherwise outputs
* the characters.
*/
public static String printKey(byte[] key) {
StringBuilder result = new StringBuilder();
for (int i = 0; i < key.length; ++i) {
if (!Character.isISOControl(key[i])) {
result.append((char) key[i]);
} else {
result.append("\\");
result.append(Integer.toOctalString(key[i]));
}
}
return result.toString();
}
private static CompareFilter.CompareOp impalaOpToHBaseOp(
BinaryPredicate.Operator impalaOp) {
switch(impalaOp) {
case EQ: return CompareFilter.CompareOp.EQUAL;
case NE: return CompareFilter.CompareOp.NOT_EQUAL;
case GT: return CompareFilter.CompareOp.GREATER;
case GE: return CompareFilter.CompareOp.GREATER_OR_EQUAL;
case LT: return CompareFilter.CompareOp.LESS;
case LE: return CompareFilter.CompareOp.LESS_OR_EQUAL;
// TODO: Add support for pushing LIKE/REGEX down to HBase with a different Filter.
default: throw new IllegalArgumentException(
"HBase: Unsupported Impala compare operator: " + impalaOp);
}
}
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
FeHBaseTable tbl = (FeHBaseTable) desc_.getTable();
// The first column in an HBase table is always the key column.
HBaseColumn keyCol = (HBaseColumn) tbl.getColumns().get(0);
List<HBaseColumn> colsToFetchFromHBase = new ArrayList<>();
for (SlotDescriptor slot : desc_.getSlots()) {
HBaseColumn col = (HBaseColumn) tbl.getColumn(slot.getLabel());
// Will add key column separately, since its always fetched.
if (col.getColumnFamily().equals(FeHBaseTable.Util.ROW_KEY_COLUMN_FAMILY)) continue;
colsToFetchFromHBase.add(col);
}
// Add the key column.
colsToFetchFromHBase.add(keyCol);
long mem_estimate = memoryEstimateForFetchingColumns(colsToFetchFromHBase);
mem_estimate = Math.max(mem_estimate, DEFAULT_MIN_ESTIMATE_BYTES);
nodeResourceProfile_ = ResourceProfile.noReservation(mem_estimate);
}
/**
* Returns an estimate of memory required by the HBase scan node for fetching
* the given list of HBase columns. Primarily used as a helper function but
* also exposed at the package level for testing.
*/
protected static long memoryEstimateForFetchingColumns(List<HBaseColumn> columns) {
// In HBase, every column value is stored in the following format:
// http://hbase.apache.org/0.94/book/regions.arch.html#keyvalue
// and out of this only rowKey per row and (columnFamily, columnQualifier and
// columnValue) per column per row are allocated. The following estimations are based
// on that and its interaction with the mem-pool. Currently, we do an
// allocate-clear cycle on mem-pool for each row fetched from HBase (See
// HBaseTableScanner::Next()). To get an approx upper limit on mem
// allocation, we take the max row size possible and assume all possible
// chunk sizes below it have been allocated in previous row iterations. So,
// for a value of n bytes the max possible mem allocation for a mem pool
// that only uses power of 2 chunk sizes will be:
// (2^(ceil((log(n))+1) - 1) ~ 2 * BitUtil.roundUpToPowerOf2(n)
long maxRowSize = 0;
boolean isMissingStats = false;
for (HBaseColumn col : columns) {
long colMaxSize = col.getStats().getMaxSize();
if (col.getType().isStringType()) {
if (colMaxSize == -1) {
colMaxSize = DEFAULT_STRING_COL_BYTES;
isMissingStats = true;
}
// Round off string col size to next power of 2. For strings less than
// 512 KB (MemPool::MAX_CHUNK_SIZE) this ensures enough contribution to
// the max row size so that the final round off can accommodate any
// fluctuations in size. For larger strings, this approximates that it
// completely takes up the new chunk allocated for it.
colMaxSize = BitUtil.roundUpToPowerOf2(colMaxSize);
}
Preconditions.checkState(colMaxSize != -1);
maxRowSize += colMaxSize;
if (!col.getColumnFamily().equals(FeHBaseTable.Util.ROW_KEY_COLUMN_FAMILY)) {
// For non-key columns, their respective column family and column qualifier
// strings need to be fetched and mem needs to be allocated for that too.
maxRowSize += col.getColumnFamily().length() + col.getColumnQualifier().length();
}
}
// Use the max allocation assuming all possible chunk sizes below it have
// been allocated.
long mem_estimate = BitUtil.roundUpToPowerOf2(maxRowSize) * 2;
// We use a default of 32 KB for string cols that dont have max length set.
// Assuming such large cols are uncommon we set an upper limit to avoid extreme
// overestimation.
if (isMissingStats) mem_estimate = Math.min(mem_estimate, DEFAULT_MAX_ESTIMATE_BYTES);
return mem_estimate;
}
@Override
public boolean hasStorageLayerConjuncts() { return !filters_.isEmpty(); }
}