blob: 6cdf9492caecb24f05a5699fb3811a056d4f6056 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.index;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelMdUtil;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.drill.common.expression.ExpressionStringBuilder;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.physical.base.DbGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.store.hbase.HBaseRegexParser;
import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
import org.apache.hadoop.hbase.HConstants;
import org.ojai.store.QueryCondition;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class MapRDBStatistics implements Statistics {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBStatistics.class);
static final String nullConditionAsString = "<NULL>";
private double rowKeyJoinBackIOFactor = 1.0;
private boolean statsAvailable = false;
private StatisticsPayload fullTableScanPayload = null;
/*
* The computed statistics are cached in <statsCache> so that any subsequent calls are returned
* from the cache. The <statsCache> is a map of <RexNode, map<Index, Stats Payload>>. The <RexNode>
* does not have a comparator so it is converted to a String for serving as a Map key. This may result
* in logically equivalent conditions considered differently e.g. sal<10 OR sal>100, sal>100 OR sal<10
* the second map maintains statistics per index as not all statistics are independent of the index
* e.g. average row size.
*/
private Map<String, Map<String, StatisticsPayload>> statsCache;
/*
* The filter independent computed statistics are cached in <fIStatsCache> so that any subsequent
* calls are returned from the cache. The <fIStatsCache> is a map of <Index, Stats Payload>. This
* cache maintains statistics per index as not all statistics are independent of the index
* e.g. average row size.
*/
private Map<String, StatisticsPayload> fIStatsCache;
/*
/*
* The mapping between <QueryCondition> and <RexNode> is kept in <conditionRexNodeMap>. This mapping
* is useful to obtain rowCount for condition specified as <QueryCondition> required during physical
* planning. Again, both the <QueryCondition> and <RexNode> are converted to Strings for the lack
* of a comparator.
*/
private Map<String, String> conditionRexNodeMap;
public MapRDBStatistics() {
statsCache = new HashMap<>();
fIStatsCache = new HashMap<>();
conditionRexNodeMap = new HashMap<>();
}
public double getRowKeyJoinBackIOFactor() {
return rowKeyJoinBackIOFactor;
}
@Override
public boolean isStatsAvailable() {
return statsAvailable;
}
@Override
public String buildUniqueIndexIdentifier(IndexDescriptor idx) {
if (idx == null) {
return null;
} else {
return idx.getTableName() + "_" + idx.getIndexName();
}
}
public String buildUniqueIndexIdentifier(String tableName, String idxName) {
if (tableName == null || idxName == null) {
return null;
} else {
return tableName + "_" + idxName;
}
}
@Override
/** Returns the number of rows satisfying the given FILTER condition
* @param condition - FILTER specified as a {@link RexNode}
* @param tabIdxName - The table/index identifier
* @return approximate rows satisfying the filter
*/
public double getRowCount(RexNode condition, String tabIdxName, RelNode scanRel) {
String conditionAsStr = nullConditionAsString;
Map<String, StatisticsPayload> payloadMap;
if ((scanRel instanceof DrillScanRel && ((DrillScanRel)scanRel).getGroupScan() instanceof DbGroupScan)
|| (scanRel instanceof ScanPrel && ((ScanPrel)scanRel).getGroupScan() instanceof DbGroupScan)) {
if (condition == null && fullTableScanPayload != null) {
return fullTableScanPayload.getRowCount();
} else if (condition != null) {
conditionAsStr = convertRexToString(condition, scanRel.getRowType());
payloadMap = statsCache.get(conditionAsStr);
if (payloadMap != null) {
if (payloadMap.get(tabIdxName) != null) {
return payloadMap.get(tabIdxName).getRowCount();
} else {
// We might not have computed rowcount for the given condition from the tab/index in question.
// For rowcount it does not matter which index was used to get the rowcount for the given condition.
// Hence, just use the first one!
for (String payloadKey : payloadMap.keySet()) {
if (payloadKey != null && payloadMap.get(payloadKey) != null) {
return payloadMap.get(payloadKey).getRowCount();
}
}
StatisticsPayload anyPayload = payloadMap.entrySet().iterator().next().getValue();
return anyPayload.getRowCount();
}
}
}
}
if (statsAvailable) {
logger.debug("Statistics: Filter row count is UNKNOWN for filter: {}", conditionAsStr);
}
return ROWCOUNT_UNKNOWN;
}
/** Returns the number of rows satisfying the given FILTER condition
* @param condition - FILTER specified as a {@link QueryCondition}
* @param tabIdxName - The table/index identifier
* @return approximate rows satisfying the filter
*/
public double getRowCount(QueryCondition condition, String tabIdxName) {
String conditionAsStr = nullConditionAsString;
Map<String, StatisticsPayload> payloadMap;
if (condition != null
&& conditionRexNodeMap.get(condition.toString()) != null) {
String rexConditionAsString = conditionRexNodeMap.get(condition.toString());
payloadMap = statsCache.get(rexConditionAsString);
if (payloadMap != null) {
if (payloadMap.get(tabIdxName) != null) {
return payloadMap.get(tabIdxName).getRowCount();
} else {
// We might not have computed rowcount for the given condition from the tab/index in question.
// For rowcount it does not matter which index was used to get the rowcount for the given condition.
// if tabIdxName is null, most likely we have found one from payloadMap and won't come to here.
// If we come to here, we are looking for payload for an index, so let us use any index's payload first!
for (String payloadKey : payloadMap.keySet()) {
if (payloadKey != null && payloadMap.get(payloadKey) != null) {
return payloadMap.get(payloadKey).getRowCount();
}
}
StatisticsPayload anyPayload = payloadMap.entrySet().iterator().next().getValue();
return anyPayload.getRowCount();
}
}
} else if (condition == null
&& fullTableScanPayload != null) {
return fullTableScanPayload.getRowCount();
}
if (condition != null) {
conditionAsStr = condition.toString();
}
if (statsAvailable) {
logger.debug("Statistics: Filter row count is UNKNOWN for filter: {}", conditionAsStr);
}
return ROWCOUNT_UNKNOWN;
}
/** Returns the number of leading rows satisfying the given FILTER condition
* @param condition - FILTER specified as a {@link RexNode}
* @param tabIdxName - The table/index identifier
* @param scanRel - The current scanRel
* @return approximate rows satisfying the leading filter
*/
@Override
public double getLeadingRowCount(RexNode condition, String tabIdxName, DrillScanRelBase scanRel) {
String conditionAsStr = nullConditionAsString;
Map<String, StatisticsPayload> payloadMap;
if ((scanRel instanceof DrillScanRel && ((DrillScanRel)scanRel).getGroupScan() instanceof DbGroupScan)
|| (scanRel instanceof ScanPrel && ((ScanPrel)scanRel).getGroupScan() instanceof DbGroupScan)) {
if (condition == null && fullTableScanPayload != null) {
return fullTableScanPayload.getLeadingRowCount();
} else if (condition != null) {
conditionAsStr = convertRexToString(condition, scanRel.getRowType());
payloadMap = statsCache.get(conditionAsStr);
if (payloadMap != null) {
if (payloadMap.get(tabIdxName) != null) {
return payloadMap.get(tabIdxName).getLeadingRowCount();
}
// Unlike rowcount, leading rowcount is dependent on the index. So, if tab/idx is
// not found, we are out of luck!
}
}
}
if (statsAvailable) {
logger.debug("Statistics: Leading filter row count is UNKNOWN for filter: {}", conditionAsStr);
}
return ROWCOUNT_UNKNOWN;
}
/** Returns the number of leading rows satisfying the given FILTER condition
* @param condition - FILTER specified as a {@link QueryCondition}
* @param tabIdxName - The table/index identifier
* @return approximate rows satisfying the leading filter
*/
public double getLeadingRowCount(QueryCondition condition, String tabIdxName) {
String conditionAsStr = nullConditionAsString;
Map<String, StatisticsPayload> payloadMap;
if (condition != null
&& conditionRexNodeMap.get(condition.toString()) != null) {
String rexConditionAsString = conditionRexNodeMap.get(condition.toString());
payloadMap = statsCache.get(rexConditionAsString);
if (payloadMap != null) {
if (payloadMap.get(tabIdxName) != null) {
return payloadMap.get(tabIdxName).getLeadingRowCount();
}
// Unlike rowcount, leading rowcount is dependent on the index. So, if tab/idx is
// not found, we are out of luck!
}
} else if (condition == null
&& fullTableScanPayload != null) {
return fullTableScanPayload.getLeadingRowCount();
}
if (condition != null) {
conditionAsStr = condition.toString();
}
if (statsAvailable) {
logger.debug("Statistics: Leading filter row count is UNKNOWN for filter: {}", conditionAsStr);
}
return ROWCOUNT_UNKNOWN;
}
@Override
public double getAvgRowSize(String tabIdxName, boolean isTableScan) {
StatisticsPayload payloadMap;
if (isTableScan && fullTableScanPayload != null) {
return fullTableScanPayload.getAvgRowSize();
} else if (!isTableScan) {
payloadMap = fIStatsCache.get(tabIdxName);
if (payloadMap != null) {
return payloadMap.getAvgRowSize();
}
}
if (statsAvailable) {
logger.debug("Statistics: Average row size is UNKNOWN for table: {}", tabIdxName);
}
return AVG_ROWSIZE_UNKNOWN;
}
public boolean initialize(RexNode condition, DrillScanRelBase scanRel, IndexCallContext context) {
GroupScan scan = IndexPlanUtils.getGroupScan(scanRel);
PlannerSettings settings = PrelUtil.getPlannerSettings(scanRel.getCluster().getPlanner());
rowKeyJoinBackIOFactor = settings.getIndexRowKeyJoinCostFactor();
if (scan instanceof DbGroupScan) {
String conditionAsStr = convertRexToString(condition, scanRel.getRowType());
if (statsCache.get(conditionAsStr) == null) {
IndexCollection indexes = ((DbGroupScan)scan).getSecondaryIndexCollection(scanRel);
populateStats(condition, indexes, scanRel, context);
logger.info("index_plan_info: initialize: scanRel #{} and groupScan {} got fulltable {}, statsCache: {}, fiStatsCache: {}",
scanRel.getId(), System.identityHashCode(scan), fullTableScanPayload, statsCache, fIStatsCache);
return true;
}
}
return false;
}
/**
* This function computes statistics when there is no query condition
* @param jTabGrpScan - The current group scan
* @param indexes - The collection of indexes to use for getting statistics
* @param scanRel - The current scanRel
* @param context - The index plan call context
*/
private void populateStatsForNoFilter(JsonTableGroupScan jTabGrpScan, IndexCollection indexes, RelNode scanRel,
IndexCallContext context) {
// Get the stats payload for full table (has total rows in the table)
StatisticsPayload ftsPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, null, scanRel);
addToCache(null, null, context, ftsPayload, jTabGrpScan, scanRel, scanRel.getRowType());
addToCache(null, jTabGrpScan.getAverageRowSizeStats(null), ftsPayload);
// Get the stats for all indexes
for (IndexDescriptor idx: indexes) {
StatisticsPayload idxPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, idx, scanRel);
StatisticsPayload idxRowSizePayload = jTabGrpScan.getAverageRowSizeStats(idx);
RelDataType newRowType;
FunctionalIndexInfo functionInfo = idx.getFunctionalInfo();
if (functionInfo.hasFunctional()) {
newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo);
} else {
newRowType = scanRel.getRowType();
}
addToCache(null, idx, context, idxPayload, jTabGrpScan, scanRel, newRowType);
addToCache(idx, idxRowSizePayload, ftsPayload);
}
}
/**
* This is the core statistics function for populating the statistics. The statistics populated correspond to the query
* condition. Based on different types of plans, we would need statistics for different combinations of predicates. Currently,
* we do not have a tree-walker for {@link QueryCondition}. Hence, instead of using the individual predicates stats, to construct
* the stats for the overall predicates, we rely on using the final predicates. Hence, this has a limitation(susceptible) to
* predicate modification post stats generation. Statistics computed/stored are rowcounts, leading rowcounts, average rowsize.
* Rowcounts and leading rowcounts (i.e. corresponding to predicates on the leading index columns) are stored in the statsCache.
* Average rowsizes are stored in the fiStatsCache (FI stands for Filter Independent).
*
* @param condition - The condition for which to obtain statistics
* @param indexes - The collection of indexes to use for getting statistics
* @param scanRel - The current scanRel
* @param context - The index plan call context
*/
private void populateStats(RexNode condition, IndexCollection indexes, DrillScanRelBase scanRel,
IndexCallContext context) {
JsonTableGroupScan jTabGrpScan;
Map<IndexDescriptor, IndexConditionInfo> firstKeyIdxConditionMap;
Map<IndexDescriptor, IndexConditionInfo> idxConditionMap;
/* Map containing the individual base conditions of an ANDed/ORed condition and their selectivities.
* This is used to compute the overall selectivity of a complex ANDed/ORed condition using its base
* conditions. Helps prevent over/under estimates and guessed selectivity for ORed predicates.
*/
Map<String, Double> baseConditionMap;
GroupScan grpScan = IndexPlanUtils.getGroupScan(scanRel);
if ((scanRel instanceof DrillScanRel || scanRel instanceof ScanPrel) &&
grpScan instanceof JsonTableGroupScan) {
jTabGrpScan = (JsonTableGroupScan) grpScan;
} else {
logger.debug("Statistics: populateStats exit early - not an instance of JsonTableGroupScan!");
return;
}
if (condition == null) {
populateStatsForNoFilter(jTabGrpScan, indexes, scanRel, context);
statsAvailable = true;
return;
}
RexBuilder builder = scanRel.getCluster().getRexBuilder();
PlannerSettings settings = PrelUtil.getSettings(scanRel.getCluster());
// Get the stats payload for full table (has total rows in the table)
StatisticsPayload ftsPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, null, scanRel);
// Get the average row size for table and all indexes
addToCache(null, jTabGrpScan.getAverageRowSizeStats(null), ftsPayload);
if (ftsPayload == null || ftsPayload.getRowCount() == 0) {
return;
}
for (IndexDescriptor idx : indexes) {
StatisticsPayload idxRowSizePayload = jTabGrpScan.getAverageRowSizeStats(idx);
addToCache(idx, idxRowSizePayload, ftsPayload);
}
/* Only use indexes with distinct first key */
IndexCollection distFKeyIndexes = distinctFKeyIndexes(indexes, scanRel);
IndexConditionInfo.Builder infoBuilder = IndexConditionInfo.newBuilder(condition,
distFKeyIndexes, builder, scanRel);
idxConditionMap = infoBuilder.getIndexConditionMap();
firstKeyIdxConditionMap = infoBuilder.getFirstKeyIndexConditionMap();
baseConditionMap = new HashMap<>();
for (IndexDescriptor idx : firstKeyIdxConditionMap.keySet()) {
if(IndexPlanUtils.conditionIndexed(context.getOrigMarker(), idx) == IndexPlanUtils.ConditionIndexed.NONE) {
continue;
}
RexNode idxCondition = firstKeyIdxConditionMap.get(idx).indexCondition;
/* Use the pre-processed condition only for getting actual statistic from MapR-DB APIs. Use the
* original condition everywhere else (cache store/lookups) since the RexNode condition and its
* corresponding QueryCondition will be used to get statistics. e.g. we convert LIKE into RANGE
* condition to get statistics. However, statistics are always asked for LIKE and NOT the RANGE
*/
RexNode preProcIdxCondition = convertToStatsCondition(idxCondition, idx, context, scanRel,
Arrays.asList(SqlKind.CAST, SqlKind.LIKE));
RelDataType newRowType;
FunctionalIndexInfo functionInfo = idx.getFunctionalInfo();
if (functionInfo.hasFunctional()) {
newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo);
} else {
newRowType = scanRel.getRowType();
}
QueryCondition queryCondition = jTabGrpScan.convertToQueryCondition(
convertToLogicalExpression(preProcIdxCondition, newRowType, settings, builder));
// Cap rows/size at total rows in case of issues with DB APIs
StatisticsPayload idxPayload = jTabGrpScan.getFirstKeyEstimatedStats(queryCondition, idx, scanRel);
double rowCount = Math.min(idxPayload.getRowCount(), ftsPayload.getRowCount());
double leadingRowCount = Math.min(idxPayload.getLeadingRowCount(), rowCount);
double avgRowSize = Math.min(idxPayload.getAvgRowSize(), ftsPayload.getAvgRowSize());
StatisticsPayload payload = new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize);
addToCache(idxCondition, idx, context, payload, jTabGrpScan, scanRel, newRowType);
addBaseConditions(idxCondition, payload, false, baseConditionMap, scanRel.getRowType());
}
/* Add the row count for index conditions on all indexes. Stats are only computed for leading
* keys but index conditions can be pushed and would be required for access path costing
*/
for (IndexDescriptor idx : idxConditionMap.keySet()) {
if(IndexPlanUtils.conditionIndexed(context.getOrigMarker(), idx) == IndexPlanUtils.ConditionIndexed.NONE) {
continue;
}
Map<LogicalExpression, RexNode> leadingPrefixMap = Maps.newHashMap();
double rowCount, leadingRowCount, avgRowSize;
RexNode idxCondition = idxConditionMap.get(idx).indexCondition;
// Ignore conditions which always evaluate to true
if (idxCondition.isAlwaysTrue()) {
continue;
}
RexNode idxIncColCondition = idxConditionMap.get(idx).remainderCondition;
RexNode idxRemColCondition = IndexPlanUtils.getLeadingPrefixMap(leadingPrefixMap, idx.getIndexColumns(), infoBuilder, idxCondition);
RexNode idxLeadColCondition = IndexPlanUtils.getLeadingColumnsFilter(
IndexPlanUtils.getLeadingFilters(leadingPrefixMap, idx.getIndexColumns()), builder);
RexNode idxTotRemColCondition = IndexPlanUtils.getTotalRemainderFilter(idxRemColCondition, idxIncColCondition, builder);
RexNode idxTotColCondition = IndexPlanUtils.getTotalFilter(idxLeadColCondition, idxTotRemColCondition, builder);
FunctionalIndexInfo functionInfo = idx.getFunctionalInfo();
RelDataType newRowType = scanRel.getRowType();
if (functionInfo.hasFunctional()) {
newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo);
}
/* For non-covering plans we would need the index leading condition */
rowCount = ftsPayload.getRowCount() * computeSelectivity(idxLeadColCondition, idx,
ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
leadingRowCount = rowCount;
avgRowSize = fIStatsCache.get(buildUniqueIndexIdentifier(idx)).getAvgRowSize();
addToCache(idxLeadColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize),
jTabGrpScan, scanRel, newRowType);
/* For covering plans we would need the full condition */
rowCount = ftsPayload.getRowCount() * computeSelectivity(idxTotColCondition, idx,
ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
addToCache(idxTotColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize),
jTabGrpScan, scanRel, newRowType);
/* For intersect plans we would need the index condition */
rowCount = ftsPayload.getRowCount() * computeSelectivity(idxCondition, idx,
ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
addToCache(idxCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize),
jTabGrpScan, scanRel, newRowType);
/* Add the rowCount for condition on only included columns - no leading columns here! */
if (idxIncColCondition != null) {
rowCount = ftsPayload.getRowCount() * computeSelectivity(idxIncColCondition, null,
ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
addToCache(idxIncColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, rowCount, avgRowSize),
jTabGrpScan, scanRel, newRowType);
}
}
// Add the rowCount for the complete condition - based on table
double rowCount = ftsPayload.getRowCount() * computeSelectivity(condition, null,
ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
// Here, ftsLeadingKey rowcount is based on _id predicates
StatisticsPayload ftsLeadingKeyPayload = jTabGrpScan.getFirstKeyEstimatedStats(jTabGrpScan.convertToQueryCondition(
convertToLogicalExpression(condition, scanRel.getRowType(), settings, builder)), null, scanRel);
addToCache(condition, null, null, new MapRDBStatisticsPayload(rowCount, ftsLeadingKeyPayload.getRowCount(),
ftsPayload.getAvgRowSize()), jTabGrpScan, scanRel, scanRel.getRowType());
// Add the full table rows while we are at it - represented by <NULL> RexNode, <NULL> QueryCondition.
// No ftsLeadingKey so leadingKeyRowcount = totalRowCount
addToCache(null, null, null, new MapRDBStatisticsPayload(ftsPayload.getRowCount(), ftsPayload.getRowCount(),
ftsPayload.getAvgRowSize()), jTabGrpScan, scanRel, scanRel.getRowType());
// mark stats has been statsAvailable
statsAvailable = true;
}
private boolean addBaseConditions(RexNode condition, StatisticsPayload payload, boolean redundant,
Map<String, Double> baseConditionMap, RelDataType rowType) {
boolean res = redundant;
if (condition.getKind() == SqlKind.AND) {
for(RexNode pred : RelOptUtil.conjunctions(condition)) {
res = addBaseConditions(pred, payload, res, baseConditionMap, rowType);
}
} else if (condition.getKind() == SqlKind.OR) {
for(RexNode pred : RelOptUtil.disjunctions(condition)) {
res = addBaseConditions(pred, payload, res, baseConditionMap, rowType);
}
} else {
// base condition
String conditionAsStr = convertRexToString(condition, rowType);
if (!redundant) {
baseConditionMap.put(conditionAsStr, payload.getRowCount());
return true;
} else {
baseConditionMap.put(conditionAsStr, -1.0);
return false;
}
}
return res;
}
/*
* Adds the statistic(row count) to the cache. Also adds the corresponding QueryCondition->RexNode
* condition mapping.
*/
private void addToCache(RexNode condition, IndexDescriptor idx, IndexCallContext context,
StatisticsPayload payload, JsonTableGroupScan jTabGrpScan, RelNode scanRel, RelDataType rowType) {
if (condition != null
&& !condition.isAlwaysTrue()) {
RexBuilder builder = scanRel.getCluster().getRexBuilder();
PlannerSettings settings = PrelUtil.getSettings(scanRel.getCluster());
String conditionAsStr = convertRexToString(condition, scanRel.getRowType());
if (statsCache.get(conditionAsStr) == null
&& payload.getRowCount() != Statistics.ROWCOUNT_UNKNOWN) {
Map<String, StatisticsPayload> payloadMap = new HashMap<>();
payloadMap.put(buildUniqueIndexIdentifier(idx), payload);
statsCache.put(conditionAsStr, payloadMap);
logger.debug("Statistics: StatsCache:<{}, {}>",conditionAsStr, payload);
// Always pre-process CAST conditions - Otherwise queryCondition will not be generated correctly
RexNode preProcIdxCondition = convertToStatsCondition(condition, idx, context, scanRel,
Arrays.asList(SqlKind.CAST));
QueryCondition queryCondition =
jTabGrpScan.convertToQueryCondition(convertToLogicalExpression(preProcIdxCondition,
rowType, settings, builder));
if (queryCondition != null) {
String queryConditionAsStr = queryCondition.toString();
if (conditionRexNodeMap.get(queryConditionAsStr) == null) {
conditionRexNodeMap.put(queryConditionAsStr, conditionAsStr);
logger.debug("Statistics: QCRNCache:<{}, {}>",queryConditionAsStr, conditionAsStr);
}
} else {
logger.debug("Statistics: QCRNCache: Unable to generate QueryCondition for {}", conditionAsStr);
logger.debug("Statistics: QCRNCache: Unable to generate QueryCondition for {}", conditionAsStr);
}
} else {
Map<String, StatisticsPayload> payloadMap = statsCache.get(conditionAsStr);
if (payloadMap != null) {
if (payloadMap.get(buildUniqueIndexIdentifier(idx)) == null) {
payloadMap.put(buildUniqueIndexIdentifier(idx), payload);
// rowCount for the same condition should be the same on primary table or index,
// let us sync them to the smallest since currently both are over-estimated.
// DO NOT sync the leading rowCount since it is based on the leading condition and not the
// condition (key for this cache). Hence, for the same condition the leading condition and
// consequently the leading rowCount will vary with the index. Syncing them may lead to
// unintended side-effects e.g. given a covering index and full table scan and a condition
// on a non-id field which happens to be the leading key in the index, the leading rowcount
// for the full table scan should be the full table rowcount. Syncing them would incorrectly
// make the full table scan cheaper! If required, syncing should be only done based on
// leading condition and NOT the condition
double minimalRowCount = payload.getRowCount();
for (StatisticsPayload existing : payloadMap.values()) {
if (existing.getRowCount() < minimalRowCount) {
minimalRowCount = existing.getRowCount();
}
}
for (StatisticsPayload existing : payloadMap.values()) {
if (existing instanceof MapRDBStatisticsPayload) {
((MapRDBStatisticsPayload)existing).rowCount = minimalRowCount;
}
}
} else {
logger.debug("Statistics: Filter row count already exists for filter: {}. Skip!", conditionAsStr);
}
} else {
logger.debug("Statistics: Filter row count is UNKNOWN for filter: {}", conditionAsStr);
}
}
} else if (condition == null && idx == null) {
fullTableScanPayload = new MapRDBStatisticsPayload(payload.getRowCount(),
payload.getLeadingRowCount(), payload.getAvgRowSize());
logger.debug("Statistics: StatsCache:<{}, {}>","NULL", fullTableScanPayload);
}
}
private void addToCache(IndexDescriptor idx, StatisticsPayload payload, StatisticsPayload ftsPayload) {
String tabIdxIdentifier = buildUniqueIndexIdentifier(idx);
if (fIStatsCache.get(tabIdxIdentifier) == null) {
if (ftsPayload.getAvgRowSize() >= payload.getAvgRowSize()) {
fIStatsCache.put(tabIdxIdentifier, payload);
logger.debug("Statistics: fIStatsCache:<{}, {}>",tabIdxIdentifier, payload);
} else {
StatisticsPayload cappedPayload =
new MapRDBStatisticsPayload(ROWCOUNT_UNKNOWN, ROWCOUNT_UNKNOWN, ftsPayload.getAvgRowSize());
fIStatsCache.put(tabIdxIdentifier,cappedPayload);
logger.debug("Statistics: fIStatsCache:<{}, {}> (Capped)",tabIdxIdentifier, cappedPayload);
}
} else {
logger.debug("Statistics: Average row size already exists for :<{}, {}>. Skip!",tabIdxIdentifier, payload);
}
}
/*
* Convert the given RexNode to a String representation while also replacing the RexInputRef references
* to actual column names. Since, we compare String representations of RexNodes, two equivalent RexNode
* expressions may differ in the RexInputRef positions but otherwise the same.
* e.g. $1 = 'CA' projection (State, Country) , $2 = 'CA' projection (Country, State)
*/
private String convertRexToString(RexNode condition, RelDataType rowType) {
StringBuilder sb = new StringBuilder();
if (condition == null) {
return null;
}
if (condition.getKind() == SqlKind.AND) {
boolean first = true;
for(RexNode pred : RelOptUtil.conjunctions(condition)) {
if (first) {
sb.append(convertRexToString(pred, rowType));
first = false;
} else {
sb.append(" " + SqlKind.AND.toString() + " ");
sb.append(convertRexToString(pred, rowType));
}
}
return sb.toString();
} else if (condition.getKind() == SqlKind.OR) {
boolean first = true;
for(RexNode pred : RelOptUtil.disjunctions(condition)) {
if (first) {
sb.append(convertRexToString(pred, rowType));
first = false;
} else {
sb.append(" " + SqlKind.OR.toString() + " ");
sb.append(convertRexToString(pred, rowType));
}
}
return sb.toString();
} else {
HashMap<String, String> inputRefMapping = new HashMap<>();
/* Based on the rel projection the input reference for the same column may change
* during planning. We want the cache to be agnostic to it. Hence, the entry stored
* in the cache has the input reference ($i) replaced with the column name
*/
getInputRefMapping(condition, rowType, inputRefMapping);
if (inputRefMapping.keySet().size() > 0) {
//Found input ref - replace it
String replCondition = condition.toString();
for (String inputRef : inputRefMapping.keySet()) {
replCondition = replCondition.replace(inputRef, inputRefMapping.get(inputRef));
}
return replCondition;
} else {
return condition.toString();
}
}
}
/*
* Generate the input reference to column mapping for reference replacement. Please
* look at the usage in convertRexToString() to understand why this mapping is required.
*/
private void getInputRefMapping(RexNode condition, RelDataType rowType,
HashMap<String, String> mapping) {
if (condition instanceof RexCall) {
for (RexNode op : ((RexCall) condition).getOperands()) {
getInputRefMapping(op, rowType, mapping);
}
} else if (condition instanceof RexInputRef) {
mapping.put(condition.toString(),
rowType.getFieldNames().get(condition.hashCode()));
}
}
/*
* Additional pre-processing may be required for LIKE/CAST predicates in order to compute statistics.
* e.g. A LIKE predicate should be converted to a RANGE predicate for statistics computation. MapR-DB
* does not yet support computing statistics for LIKE predicates.
*/
private RexNode convertToStatsCondition(RexNode condition, IndexDescriptor index,
IndexCallContext context, RelNode scanRel, List<SqlKind>typesToProcess) {
RexBuilder builder = scanRel.getCluster().getRexBuilder();
if (condition.getKind() == SqlKind.AND) {
final List<RexNode> conditions = Lists.newArrayList();
for(RexNode pred : RelOptUtil.conjunctions(condition)) {
conditions.add(convertToStatsCondition(pred, index, context, scanRel, typesToProcess));
}
return RexUtil.composeConjunction(builder, conditions, false);
} else if (condition.getKind() == SqlKind.OR) {
final List<RexNode> conditions = Lists.newArrayList();
for(RexNode pred : RelOptUtil.disjunctions(condition)) {
conditions.add(convertToStatsCondition(pred, index, context, scanRel, typesToProcess));
}
return RexUtil.composeDisjunction(builder, conditions, false);
} else if (condition instanceof RexCall) {
// LIKE operator - convert to a RANGE predicate, if possible
if (typesToProcess.contains(SqlKind.LIKE)
&& ((RexCall) condition).getOperator().getKind() == SqlKind.LIKE) {
return convertLikeToRange((RexCall)condition, builder);
} else if (typesToProcess.contains(SqlKind.CAST)
&& hasCastExpression(condition)) {
return convertCastForFIdx(((RexCall) condition), index, context, scanRel);
}
else {
return condition;
}
}
return condition;
}
/*
* Determines whether the given expression contains a CAST expression. Assumes that the
* given expression is a valid expression.
* Returns TRUE, if it finds at least one instance of CAST operator.
*/
private boolean hasCastExpression(RexNode condition) {
if (condition instanceof RexCall) {
if (((RexCall) condition).getOperator().getKind() == SqlKind.CAST) {
return true;
}
for (RexNode op : ((RexCall) condition).getOperands()) {
if (hasCastExpression(op)) {
return true;
}
}
}
return false;
}
/*
* CAST expressions are not understood by MAPR-DB as-is. Hence, we must convert them before passing them
* onto MAPR-DB for statistics. Given a functional index, the given expression is converted into an
* expression on the `expression` column of the functional index.
*/
private RexNode convertCastForFIdx(RexCall condition, IndexDescriptor index,
IndexCallContext context, RelNode origScan) {
if (index == null) {
return condition;
}
FunctionalIndexInfo functionInfo = index.getFunctionalInfo();
if (!functionInfo.hasFunctional()) {
return condition;
}
// The functional index has a different row-type than the original scan. Use the index row-type when
// converting the condition
RelDataType newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(origScan, context, functionInfo);
RexBuilder builder = origScan.getCluster().getRexBuilder();
return FunctionalIndexHelper.convertConditionForIndexScan(condition,
origScan, newRowType, builder, functionInfo);
}
/*
* Helper function to perform additional pre-processing for LIKE predicates
*/
private RexNode convertLikeToRange(RexCall condition, RexBuilder builder) {
Preconditions.checkArgument(condition.getOperator().getKind() == SqlKind.LIKE,
"Unable to convertLikeToRange: argument is not a LIKE condition!");
HBaseRegexParser parser = null;
RexNode arg = null;
RexLiteral pattern = null, escape = null;
String patternStr = null, escapeStr = null;
if (condition.getOperands().size() == 2) {
// No escape character specified
for (RexNode op : condition.getOperands()) {
if (op.getKind() == SqlKind.LITERAL) {
pattern = (RexLiteral) op;
} else {
arg = op;
}
}
// Get the PATTERN strings from the corresponding RexLiteral
if (pattern.getTypeName() == SqlTypeName.DECIMAL ||
pattern.getTypeName() == SqlTypeName.INTEGER) {
patternStr = pattern.getValue().toString();
} else if (pattern.getTypeName() == SqlTypeName.CHAR) {
patternStr = pattern.getValue2().toString();
}
if (patternStr != null) {
parser = new HBaseRegexParser(patternStr);
}
} else if (condition.getOperands().size() == 3) {
// Escape character specified
for (RexNode op : condition.getOperands()) {
if (op.getKind() == SqlKind.LITERAL) {
// Assume first literal specifies PATTERN and the second literal specifies the ESCAPE char
if (pattern == null) {
pattern = (RexLiteral) op;
} else {
escape = (RexLiteral) op;
}
} else {
arg = op;
}
}
// Get the PATTERN and ESCAPE strings from the corresponding RexLiteral
if (pattern.getTypeName() == SqlTypeName.DECIMAL ||
pattern.getTypeName() == SqlTypeName.INTEGER) {
patternStr = pattern.getValue().toString();
} else if (pattern.getTypeName() == SqlTypeName.CHAR) {
patternStr = pattern.getValue2().toString();
}
if (escape.getTypeName() == SqlTypeName.DECIMAL ||
escape.getTypeName() == SqlTypeName.INTEGER) {
escapeStr = escape.getValue().toString();
} else if (escape.getTypeName() == SqlTypeName.CHAR) {
escapeStr = escape.getValue2().toString();
}
if (patternStr != null && escapeStr != null) {
parser = new HBaseRegexParser(patternStr, escapeStr.toCharArray()[0]);
}
}
if (parser != null) {
parser.parse();
String prefix = parser.getPrefixString();
/*
* If there is a literal prefix, convert it into an EQUALITY or RANGE predicate
*/
if (prefix != null) {
if (prefix.equals(parser.getLikeString())) {
// No WILDCARD present. This turns the LIKE predicate to EQUALITY predicate
if (arg != null) {
return builder.makeCall(SqlStdOperatorTable.EQUALS, arg, pattern);
}
} else {
// WILDCARD present. This turns the LIKE predicate to RANGE predicate
byte[] startKey = HConstants.EMPTY_START_ROW;
byte[] stopKey = HConstants.EMPTY_END_ROW;
startKey = prefix.getBytes(Charsets.UTF_8);
stopKey = startKey.clone();
boolean isMaxVal = true;
for (int i = stopKey.length - 1; i >= 0; --i) {
int nextByteValue = (0xff & stopKey[i]) + 1;
if (nextByteValue < 0xff) {
stopKey[i] = (byte) nextByteValue;
isMaxVal = false;
break;
} else {
stopKey[i] = 0;
}
}
if (isMaxVal) {
stopKey = HConstants.EMPTY_END_ROW;
}
// TODO: This maybe a potential bug since we assume UTF-8 encoding. However, we follow the
// current DB implementation. See HBaseFilterBuilder.createHBaseScanSpec "like" CASE statement
RexLiteral startKeyLiteral = builder.makeLiteral(new String(startKey,
Charsets.UTF_8));
RexLiteral stopKeyLiteral = builder.makeLiteral(new String(stopKey,
Charsets.UTF_8));
if (arg != null) {
RexNode startPred = builder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
arg, startKeyLiteral);
RexNode stopPred = builder.makeCall(SqlStdOperatorTable.LESS_THAN, arg, stopKeyLiteral);
return builder.makeCall(SqlStdOperatorTable.AND, startPred, stopPred);
}
}
}
}
// Could not convert - return condition as-is.
return condition;
}
/*
* Compute the selectivity of the given rowCondition. Retrieve the selectivity
* for index conditions from the cache
*/
private Pair<Double, Boolean> computeSelectivity(RexNode condition, IndexDescriptor idx, double totalRows,
RelNode scanRel, Map<String, Double> baseConditionMap) {
double selectivity;
boolean guess = false;
if (totalRows <= 0) {
return new Pair<>(1.0, true);
}
String conditionAsStr = convertRexToString(condition, scanRel.getRowType());
if (condition.getKind() == SqlKind.AND) {
selectivity = 1.0;
for (RexNode pred : RelOptUtil.conjunctions(condition)) {
Pair<Double, Boolean> selPayload = computeSelectivity(pred, idx, totalRows, scanRel, baseConditionMap);
if (selPayload.left > 0) {
// At least one AND branch is a guess
if (selPayload.right == true) {
guess = true;
}
selectivity *= selPayload.left;
}
}
} else if (condition.getKind() == SqlKind.OR) {
selectivity = 0.0;
for (RexNode pred : RelOptUtil.disjunctions(condition)) {
Pair<Double, Boolean> selPayload = computeSelectivity(pred, idx, totalRows, scanRel, baseConditionMap);
if (selPayload.left > 0.0) {
// At least one OR branch is a guess
if (selPayload.right == true) {
guess = true;
}
selectivity += selPayload.left;
}
}
// Cap selectivity of OR'ed predicates at 0.25 if at least one predicate is a guess (Calcite does the same)
if (guess && selectivity > 0.25) {
selectivity = 0.25;
}
} else {
guess = false;
if (baseConditionMap.get(conditionAsStr) != null) {
double rowCount = baseConditionMap.get(conditionAsStr);
if (rowCount != -1.0) {
selectivity = rowCount / totalRows;
} else {
// Ignore
selectivity = -1.0;
guess = true;
}
} else {
selectivity = RelMdUtil.guessSelectivity(condition);
guess = true;
}
return new Pair<>(selectivity, guess);
}
// Cap selectivity to be between 0.0 and 1.0
selectivity = Math.min(1.0, selectivity);
selectivity = Math.max(0.0, selectivity);
logger.debug("Statistics: computeSelectivity: Cache MISS: Computed {} -> {}", conditionAsStr, selectivity);
return new Pair<>(selectivity, guess);
}
/*
* Filters out indexes from the given collection based on the row key of indexes i.e. after filtering
* the given collection would contain only one index for each distinct row key in the collection
*/
private IndexCollection distinctFKeyIndexes(IndexCollection indexes, RelNode scanRel) {
IndexCollection distinctIdxCollection = new DrillIndexCollection(scanRel, new HashSet<DrillIndexDescriptor>());
Iterator<IndexDescriptor> iterator = indexes.iterator();
Map<String, List<IndexDescriptor>> firstColIndexMap = new HashMap<>();
while (iterator.hasNext()) {
IndexDescriptor index = iterator.next();
// If index has columns - the first column is the leading column for the index
if (index.getIndexColumns() != null) {
List<IndexDescriptor> idxList;
String firstCol = convertLExToStr(index.getIndexColumns().get(0));
if (firstColIndexMap.get(firstCol) != null) {
idxList = firstColIndexMap.get(firstCol);
} else {
idxList = new ArrayList<>();
}
idxList.add(index);
firstColIndexMap.put(firstCol, idxList);
}
}
for (String firstCol : firstColIndexMap.keySet()) {
List<IndexDescriptor> indexesSameFirstCol = firstColIndexMap.get(firstCol);
double maxAvgRowSize = -1.0;
IndexDescriptor selectedIdx = null;
for (IndexDescriptor idx : indexesSameFirstCol) {
String tabIdxIdentifier = buildUniqueIndexIdentifier(idx);
double idxRowSize = fIStatsCache.get(tabIdxIdentifier).getAvgRowSize();
// Prefer index with largest average row-size, breaking ties lexicographically
if (idxRowSize > maxAvgRowSize
|| (idxRowSize == maxAvgRowSize
&& (selectedIdx == null || idx.getIndexName().compareTo(selectedIdx.getIndexName()) < 0))) {
maxAvgRowSize = idxRowSize;
selectedIdx = idx;
}
}
assert (selectedIdx != null);
distinctIdxCollection.addIndex(selectedIdx);
}
return distinctIdxCollection;
}
/*
* Returns the String representation for the given Logical Expression
*/
private String convertLExToStr(LogicalExpression lex) {
StringBuilder sb = new StringBuilder();
ExpressionStringBuilder esb = new ExpressionStringBuilder();
lex.accept(esb, sb);
return sb.toString();
}
/*
* Converts the given RexNode condition into a Drill logical expression.
*/
private LogicalExpression convertToLogicalExpression(RexNode condition,
RelDataType type, PlannerSettings settings, RexBuilder builder) {
LogicalExpression conditionExp;
try {
conditionExp = DrillOptiq.toDrill(new DrillParseContext(settings), type, builder, condition);
} catch (ClassCastException e) {
return null;
}
return conditionExp;
}
}