| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.planner.index; |
| |
| import org.apache.calcite.plan.RelOptUtil; |
| import org.apache.calcite.rel.RelNode; |
| import org.apache.calcite.rel.metadata.RelMdUtil; |
| import org.apache.calcite.rel.type.RelDataType; |
| import org.apache.calcite.rex.RexBuilder; |
| import org.apache.calcite.rex.RexCall; |
| import org.apache.calcite.rex.RexLiteral; |
| import org.apache.calcite.rex.RexNode; |
| import org.apache.calcite.rex.RexInputRef; |
| import org.apache.calcite.rex.RexUtil; |
| import org.apache.calcite.sql.SqlKind; |
| import org.apache.calcite.sql.fun.SqlStdOperatorTable; |
| import org.apache.calcite.sql.type.SqlTypeName; |
| import org.apache.calcite.util.Pair; |
| import org.apache.drill.common.expression.ExpressionStringBuilder; |
| import org.apache.drill.common.expression.LogicalExpression; |
| import org.apache.drill.exec.physical.base.DbGroupScan; |
| import org.apache.drill.exec.physical.base.GroupScan; |
| import org.apache.drill.exec.planner.common.DrillScanRelBase; |
| import org.apache.drill.exec.planner.logical.DrillOptiq; |
| import org.apache.drill.exec.planner.logical.DrillParseContext; |
| import org.apache.drill.exec.planner.logical.DrillScanRel; |
| import org.apache.drill.exec.planner.physical.PlannerSettings; |
| import org.apache.drill.exec.planner.physical.PrelUtil; |
| import org.apache.drill.exec.planner.physical.ScanPrel; |
| import org.apache.drill.exec.store.hbase.HBaseRegexParser; |
| import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.ojai.store.QueryCondition; |
| |
| import java.util.Arrays; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import com.google.common.base.Charsets; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| |
| public class MapRDBStatistics implements Statistics { |
| static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBStatistics.class); |
| static final String nullConditionAsString = "<NULL>"; |
| private double rowKeyJoinBackIOFactor = 1.0; |
| private boolean statsAvailable = false; |
| private StatisticsPayload fullTableScanPayload = null; |
| /* |
| * The computed statistics are cached in <statsCache> so that any subsequent calls are returned |
| * from the cache. The <statsCache> is a map of <RexNode, map<Index, Stats Payload>>. The <RexNode> |
| * does not have a comparator so it is converted to a String for serving as a Map key. This may result |
| * in logically equivalent conditions considered differently e.g. sal<10 OR sal>100, sal>100 OR sal<10 |
| * the second map maintains statistics per index as not all statistics are independent of the index |
| * e.g. average row size. |
| */ |
| private Map<String, Map<String, StatisticsPayload>> statsCache; |
| /* |
| * The filter independent computed statistics are cached in <fIStatsCache> so that any subsequent |
| * calls are returned from the cache. The <fIStatsCache> is a map of <Index, Stats Payload>. This |
| * cache maintains statistics per index as not all statistics are independent of the index |
| * e.g. average row size. |
| */ |
| private Map<String, StatisticsPayload> fIStatsCache; |
| /* |
| /* |
| * The mapping between <QueryCondition> and <RexNode> is kept in <conditionRexNodeMap>. This mapping |
| * is useful to obtain rowCount for condition specified as <QueryCondition> required during physical |
| * planning. Again, both the <QueryCondition> and <RexNode> are converted to Strings for the lack |
| * of a comparator. |
| */ |
| private Map<String, String> conditionRexNodeMap; |
| |
| public MapRDBStatistics() { |
| statsCache = new HashMap<>(); |
| fIStatsCache = new HashMap<>(); |
| conditionRexNodeMap = new HashMap<>(); |
| } |
| |
| public double getRowKeyJoinBackIOFactor() { |
| return rowKeyJoinBackIOFactor; |
| } |
| |
| @Override |
| public boolean isStatsAvailable() { |
| return statsAvailable; |
| } |
| |
| @Override |
| public String buildUniqueIndexIdentifier(IndexDescriptor idx) { |
| if (idx == null) { |
| return null; |
| } else { |
| return idx.getTableName() + "_" + idx.getIndexName(); |
| } |
| } |
| |
| public String buildUniqueIndexIdentifier(String tableName, String idxName) { |
| if (tableName == null || idxName == null) { |
| return null; |
| } else { |
| return tableName + "_" + idxName; |
| } |
| } |
| |
| @Override |
| /** Returns the number of rows satisfying the given FILTER condition |
| * @param condition - FILTER specified as a {@link RexNode} |
| * @param tabIdxName - The table/index identifier |
| * @return approximate rows satisfying the filter |
| */ |
| public double getRowCount(RexNode condition, String tabIdxName, RelNode scanRel) { |
| String conditionAsStr = nullConditionAsString; |
| Map<String, StatisticsPayload> payloadMap; |
| if ((scanRel instanceof DrillScanRel && ((DrillScanRel)scanRel).getGroupScan() instanceof DbGroupScan) |
| || (scanRel instanceof ScanPrel && ((ScanPrel)scanRel).getGroupScan() instanceof DbGroupScan)) { |
| if (condition == null && fullTableScanPayload != null) { |
| return fullTableScanPayload.getRowCount(); |
| } else if (condition != null) { |
| conditionAsStr = convertRexToString(condition, scanRel.getRowType()); |
| payloadMap = statsCache.get(conditionAsStr); |
| if (payloadMap != null) { |
| if (payloadMap.get(tabIdxName) != null) { |
| return payloadMap.get(tabIdxName).getRowCount(); |
| } else { |
| // We might not have computed rowcount for the given condition from the tab/index in question. |
| // For rowcount it does not matter which index was used to get the rowcount for the given condition. |
| // Hence, just use the first one! |
| for (String payloadKey : payloadMap.keySet()) { |
| if (payloadKey != null && payloadMap.get(payloadKey) != null) { |
| return payloadMap.get(payloadKey).getRowCount(); |
| } |
| } |
| StatisticsPayload anyPayload = payloadMap.entrySet().iterator().next().getValue(); |
| return anyPayload.getRowCount(); |
| } |
| } |
| } |
| } |
| if (statsAvailable) { |
| logger.debug("Statistics: Filter row count is UNKNOWN for filter: {}", conditionAsStr); |
| } |
| return ROWCOUNT_UNKNOWN; |
| } |
| |
| /** Returns the number of rows satisfying the given FILTER condition |
| * @param condition - FILTER specified as a {@link QueryCondition} |
| * @param tabIdxName - The table/index identifier |
| * @return approximate rows satisfying the filter |
| */ |
| public double getRowCount(QueryCondition condition, String tabIdxName) { |
| String conditionAsStr = nullConditionAsString; |
| Map<String, StatisticsPayload> payloadMap; |
| if (condition != null |
| && conditionRexNodeMap.get(condition.toString()) != null) { |
| String rexConditionAsString = conditionRexNodeMap.get(condition.toString()); |
| payloadMap = statsCache.get(rexConditionAsString); |
| if (payloadMap != null) { |
| if (payloadMap.get(tabIdxName) != null) { |
| return payloadMap.get(tabIdxName).getRowCount(); |
| } else { |
| // We might not have computed rowcount for the given condition from the tab/index in question. |
| // For rowcount it does not matter which index was used to get the rowcount for the given condition. |
| // if tabIdxName is null, most likely we have found one from payloadMap and won't come to here. |
| // If we come to here, we are looking for payload for an index, so let us use any index's payload first! |
| for (String payloadKey : payloadMap.keySet()) { |
| if (payloadKey != null && payloadMap.get(payloadKey) != null) { |
| return payloadMap.get(payloadKey).getRowCount(); |
| } |
| } |
| StatisticsPayload anyPayload = payloadMap.entrySet().iterator().next().getValue(); |
| return anyPayload.getRowCount(); |
| } |
| } |
| } else if (condition == null |
| && fullTableScanPayload != null) { |
| return fullTableScanPayload.getRowCount(); |
| } |
| if (condition != null) { |
| conditionAsStr = condition.toString(); |
| } |
| if (statsAvailable) { |
| logger.debug("Statistics: Filter row count is UNKNOWN for filter: {}", conditionAsStr); |
| } |
| return ROWCOUNT_UNKNOWN; |
| } |
| |
| /** Returns the number of leading rows satisfying the given FILTER condition |
| * @param condition - FILTER specified as a {@link RexNode} |
| * @param tabIdxName - The table/index identifier |
| * @param scanRel - The current scanRel |
| * @return approximate rows satisfying the leading filter |
| */ |
| @Override |
| public double getLeadingRowCount(RexNode condition, String tabIdxName, DrillScanRelBase scanRel) { |
| String conditionAsStr = nullConditionAsString; |
| Map<String, StatisticsPayload> payloadMap; |
| if ((scanRel instanceof DrillScanRel && ((DrillScanRel)scanRel).getGroupScan() instanceof DbGroupScan) |
| || (scanRel instanceof ScanPrel && ((ScanPrel)scanRel).getGroupScan() instanceof DbGroupScan)) { |
| if (condition == null && fullTableScanPayload != null) { |
| return fullTableScanPayload.getLeadingRowCount(); |
| } else if (condition != null) { |
| conditionAsStr = convertRexToString(condition, scanRel.getRowType()); |
| payloadMap = statsCache.get(conditionAsStr); |
| if (payloadMap != null) { |
| if (payloadMap.get(tabIdxName) != null) { |
| return payloadMap.get(tabIdxName).getLeadingRowCount(); |
| } |
| // Unlike rowcount, leading rowcount is dependent on the index. So, if tab/idx is |
| // not found, we are out of luck! |
| } |
| } |
| } |
| if (statsAvailable) { |
| logger.debug("Statistics: Leading filter row count is UNKNOWN for filter: {}", conditionAsStr); |
| } |
| return ROWCOUNT_UNKNOWN; |
| } |
| |
| /** Returns the number of leading rows satisfying the given FILTER condition |
| * @param condition - FILTER specified as a {@link QueryCondition} |
| * @param tabIdxName - The table/index identifier |
| * @return approximate rows satisfying the leading filter |
| */ |
| public double getLeadingRowCount(QueryCondition condition, String tabIdxName) { |
| String conditionAsStr = nullConditionAsString; |
| Map<String, StatisticsPayload> payloadMap; |
| if (condition != null |
| && conditionRexNodeMap.get(condition.toString()) != null) { |
| String rexConditionAsString = conditionRexNodeMap.get(condition.toString()); |
| payloadMap = statsCache.get(rexConditionAsString); |
| if (payloadMap != null) { |
| if (payloadMap.get(tabIdxName) != null) { |
| return payloadMap.get(tabIdxName).getLeadingRowCount(); |
| } |
| // Unlike rowcount, leading rowcount is dependent on the index. So, if tab/idx is |
| // not found, we are out of luck! |
| } |
| } else if (condition == null |
| && fullTableScanPayload != null) { |
| return fullTableScanPayload.getLeadingRowCount(); |
| } |
| if (condition != null) { |
| conditionAsStr = condition.toString(); |
| } |
| if (statsAvailable) { |
| logger.debug("Statistics: Leading filter row count is UNKNOWN for filter: {}", conditionAsStr); |
| } |
| return ROWCOUNT_UNKNOWN; |
| } |
| |
| @Override |
| public double getAvgRowSize(String tabIdxName, boolean isTableScan) { |
| StatisticsPayload payloadMap; |
| if (isTableScan && fullTableScanPayload != null) { |
| return fullTableScanPayload.getAvgRowSize(); |
| } else if (!isTableScan) { |
| payloadMap = fIStatsCache.get(tabIdxName); |
| if (payloadMap != null) { |
| return payloadMap.getAvgRowSize(); |
| } |
| } |
| if (statsAvailable) { |
| logger.debug("Statistics: Average row size is UNKNOWN for table: {}", tabIdxName); |
| } |
| return AVG_ROWSIZE_UNKNOWN; |
| } |
| |
| public boolean initialize(RexNode condition, DrillScanRelBase scanRel, IndexCallContext context) { |
| GroupScan scan = IndexPlanUtils.getGroupScan(scanRel); |
| |
| PlannerSettings settings = PrelUtil.getPlannerSettings(scanRel.getCluster().getPlanner()); |
| rowKeyJoinBackIOFactor = settings.getIndexRowKeyJoinCostFactor(); |
| if (scan instanceof DbGroupScan) { |
| String conditionAsStr = convertRexToString(condition, scanRel.getRowType()); |
| if (statsCache.get(conditionAsStr) == null) { |
| IndexCollection indexes = ((DbGroupScan)scan).getSecondaryIndexCollection(scanRel); |
| populateStats(condition, indexes, scanRel, context); |
| logger.info("index_plan_info: initialize: scanRel #{} and groupScan {} got fulltable {}, statsCache: {}, fiStatsCache: {}", |
| scanRel.getId(), System.identityHashCode(scan), fullTableScanPayload, statsCache, fIStatsCache); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * This function computes statistics when there is no query condition |
| * @param jTabGrpScan - The current group scan |
| * @param indexes - The collection of indexes to use for getting statistics |
| * @param scanRel - The current scanRel |
| * @param context - The index plan call context |
| */ |
| private void populateStatsForNoFilter(JsonTableGroupScan jTabGrpScan, IndexCollection indexes, RelNode scanRel, |
| IndexCallContext context) { |
| // Get the stats payload for full table (has total rows in the table) |
| StatisticsPayload ftsPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, null, scanRel); |
| addToCache(null, null, context, ftsPayload, jTabGrpScan, scanRel, scanRel.getRowType()); |
| addToCache(null, jTabGrpScan.getAverageRowSizeStats(null), ftsPayload); |
| // Get the stats for all indexes |
| for (IndexDescriptor idx: indexes) { |
| StatisticsPayload idxPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, idx, scanRel); |
| StatisticsPayload idxRowSizePayload = jTabGrpScan.getAverageRowSizeStats(idx); |
| RelDataType newRowType; |
| FunctionalIndexInfo functionInfo = idx.getFunctionalInfo(); |
| if (functionInfo.hasFunctional()) { |
| newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo); |
| } else { |
| newRowType = scanRel.getRowType(); |
| } |
| addToCache(null, idx, context, idxPayload, jTabGrpScan, scanRel, newRowType); |
| addToCache(idx, idxRowSizePayload, ftsPayload); |
| } |
| } |
| |
| /** |
| * This is the core statistics function for populating the statistics. The statistics populated correspond to the query |
| * condition. Based on different types of plans, we would need statistics for different combinations of predicates. Currently, |
| * we do not have a tree-walker for {@link QueryCondition}. Hence, instead of using the individual predicates stats, to construct |
| * the stats for the overall predicates, we rely on using the final predicates. Hence, this has a limitation(susceptible) to |
| * predicate modification post stats generation. Statistics computed/stored are rowcounts, leading rowcounts, average rowsize. |
| * Rowcounts and leading rowcounts (i.e. corresponding to predicates on the leading index columns) are stored in the statsCache. |
| * Average rowsizes are stored in the fiStatsCache (FI stands for Filter Independent). |
| * |
| * @param condition - The condition for which to obtain statistics |
| * @param indexes - The collection of indexes to use for getting statistics |
| * @param scanRel - The current scanRel |
| * @param context - The index plan call context |
| */ |
| private void populateStats(RexNode condition, IndexCollection indexes, DrillScanRelBase scanRel, |
| IndexCallContext context) { |
| JsonTableGroupScan jTabGrpScan; |
| Map<IndexDescriptor, IndexConditionInfo> firstKeyIdxConditionMap; |
| Map<IndexDescriptor, IndexConditionInfo> idxConditionMap; |
| /* Map containing the individual base conditions of an ANDed/ORed condition and their selectivities. |
| * This is used to compute the overall selectivity of a complex ANDed/ORed condition using its base |
| * conditions. Helps prevent over/under estimates and guessed selectivity for ORed predicates. |
| */ |
| Map<String, Double> baseConditionMap; |
| GroupScan grpScan = IndexPlanUtils.getGroupScan(scanRel); |
| |
| if ((scanRel instanceof DrillScanRel || scanRel instanceof ScanPrel) && |
| grpScan instanceof JsonTableGroupScan) { |
| jTabGrpScan = (JsonTableGroupScan) grpScan; |
| } else { |
| logger.debug("Statistics: populateStats exit early - not an instance of JsonTableGroupScan!"); |
| return; |
| } |
| if (condition == null) { |
| populateStatsForNoFilter(jTabGrpScan, indexes, scanRel, context); |
| statsAvailable = true; |
| return; |
| } |
| |
| RexBuilder builder = scanRel.getCluster().getRexBuilder(); |
| PlannerSettings settings = PrelUtil.getSettings(scanRel.getCluster()); |
| // Get the stats payload for full table (has total rows in the table) |
| StatisticsPayload ftsPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, null, scanRel); |
| |
| // Get the average row size for table and all indexes |
| addToCache(null, jTabGrpScan.getAverageRowSizeStats(null), ftsPayload); |
| if (ftsPayload == null || ftsPayload.getRowCount() == 0) { |
| return; |
| } |
| for (IndexDescriptor idx : indexes) { |
| StatisticsPayload idxRowSizePayload = jTabGrpScan.getAverageRowSizeStats(idx); |
| addToCache(idx, idxRowSizePayload, ftsPayload); |
| } |
| |
| /* Only use indexes with distinct first key */ |
| IndexCollection distFKeyIndexes = distinctFKeyIndexes(indexes, scanRel); |
| IndexConditionInfo.Builder infoBuilder = IndexConditionInfo.newBuilder(condition, |
| distFKeyIndexes, builder, scanRel); |
| idxConditionMap = infoBuilder.getIndexConditionMap(); |
| firstKeyIdxConditionMap = infoBuilder.getFirstKeyIndexConditionMap(); |
| baseConditionMap = new HashMap<>(); |
| for (IndexDescriptor idx : firstKeyIdxConditionMap.keySet()) { |
| if(IndexPlanUtils.conditionIndexed(context.getOrigMarker(), idx) == IndexPlanUtils.ConditionIndexed.NONE) { |
| continue; |
| } |
| RexNode idxCondition = firstKeyIdxConditionMap.get(idx).indexCondition; |
| /* Use the pre-processed condition only for getting actual statistic from MapR-DB APIs. Use the |
| * original condition everywhere else (cache store/lookups) since the RexNode condition and its |
| * corresponding QueryCondition will be used to get statistics. e.g. we convert LIKE into RANGE |
| * condition to get statistics. However, statistics are always asked for LIKE and NOT the RANGE |
| */ |
| RexNode preProcIdxCondition = convertToStatsCondition(idxCondition, idx, context, scanRel, |
| Arrays.asList(SqlKind.CAST, SqlKind.LIKE)); |
| RelDataType newRowType; |
| FunctionalIndexInfo functionInfo = idx.getFunctionalInfo(); |
| if (functionInfo.hasFunctional()) { |
| newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo); |
| } else { |
| newRowType = scanRel.getRowType(); |
| } |
| |
| QueryCondition queryCondition = jTabGrpScan.convertToQueryCondition( |
| convertToLogicalExpression(preProcIdxCondition, newRowType, settings, builder)); |
| // Cap rows/size at total rows in case of issues with DB APIs |
| StatisticsPayload idxPayload = jTabGrpScan.getFirstKeyEstimatedStats(queryCondition, idx, scanRel); |
| double rowCount = Math.min(idxPayload.getRowCount(), ftsPayload.getRowCount()); |
| double leadingRowCount = Math.min(idxPayload.getLeadingRowCount(), rowCount); |
| double avgRowSize = Math.min(idxPayload.getAvgRowSize(), ftsPayload.getAvgRowSize()); |
| StatisticsPayload payload = new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize); |
| addToCache(idxCondition, idx, context, payload, jTabGrpScan, scanRel, newRowType); |
| addBaseConditions(idxCondition, payload, false, baseConditionMap, scanRel.getRowType()); |
| } |
| /* Add the row count for index conditions on all indexes. Stats are only computed for leading |
| * keys but index conditions can be pushed and would be required for access path costing |
| */ |
| for (IndexDescriptor idx : idxConditionMap.keySet()) { |
| if(IndexPlanUtils.conditionIndexed(context.getOrigMarker(), idx) == IndexPlanUtils.ConditionIndexed.NONE) { |
| continue; |
| } |
| Map<LogicalExpression, RexNode> leadingPrefixMap = Maps.newHashMap(); |
| double rowCount, leadingRowCount, avgRowSize; |
| RexNode idxCondition = idxConditionMap.get(idx).indexCondition; |
| // Ignore conditions which always evaluate to true |
| if (idxCondition.isAlwaysTrue()) { |
| continue; |
| } |
| RexNode idxIncColCondition = idxConditionMap.get(idx).remainderCondition; |
| RexNode idxRemColCondition = IndexPlanUtils.getLeadingPrefixMap(leadingPrefixMap, idx.getIndexColumns(), infoBuilder, idxCondition); |
| RexNode idxLeadColCondition = IndexPlanUtils.getLeadingColumnsFilter( |
| IndexPlanUtils.getLeadingFilters(leadingPrefixMap, idx.getIndexColumns()), builder); |
| RexNode idxTotRemColCondition = IndexPlanUtils.getTotalRemainderFilter(idxRemColCondition, idxIncColCondition, builder); |
| RexNode idxTotColCondition = IndexPlanUtils.getTotalFilter(idxLeadColCondition, idxTotRemColCondition, builder); |
| FunctionalIndexInfo functionInfo = idx.getFunctionalInfo(); |
| RelDataType newRowType = scanRel.getRowType(); |
| if (functionInfo.hasFunctional()) { |
| newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo); |
| } |
| /* For non-covering plans we would need the index leading condition */ |
| rowCount = ftsPayload.getRowCount() * computeSelectivity(idxLeadColCondition, idx, |
| ftsPayload.getRowCount(), scanRel, baseConditionMap).left; |
| leadingRowCount = rowCount; |
| avgRowSize = fIStatsCache.get(buildUniqueIndexIdentifier(idx)).getAvgRowSize(); |
| addToCache(idxLeadColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize), |
| jTabGrpScan, scanRel, newRowType); |
| /* For covering plans we would need the full condition */ |
| rowCount = ftsPayload.getRowCount() * computeSelectivity(idxTotColCondition, idx, |
| ftsPayload.getRowCount(), scanRel, baseConditionMap).left; |
| addToCache(idxTotColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize), |
| jTabGrpScan, scanRel, newRowType); |
| /* For intersect plans we would need the index condition */ |
| rowCount = ftsPayload.getRowCount() * computeSelectivity(idxCondition, idx, |
| ftsPayload.getRowCount(), scanRel, baseConditionMap).left; |
| addToCache(idxCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize), |
| jTabGrpScan, scanRel, newRowType); |
| /* Add the rowCount for condition on only included columns - no leading columns here! */ |
| if (idxIncColCondition != null) { |
| rowCount = ftsPayload.getRowCount() * computeSelectivity(idxIncColCondition, null, |
| ftsPayload.getRowCount(), scanRel, baseConditionMap).left; |
| addToCache(idxIncColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, rowCount, avgRowSize), |
| jTabGrpScan, scanRel, newRowType); |
| } |
| } |
| |
| // Add the rowCount for the complete condition - based on table |
| double rowCount = ftsPayload.getRowCount() * computeSelectivity(condition, null, |
| ftsPayload.getRowCount(), scanRel, baseConditionMap).left; |
| // Here, ftsLeadingKey rowcount is based on _id predicates |
| StatisticsPayload ftsLeadingKeyPayload = jTabGrpScan.getFirstKeyEstimatedStats(jTabGrpScan.convertToQueryCondition( |
| convertToLogicalExpression(condition, scanRel.getRowType(), settings, builder)), null, scanRel); |
| addToCache(condition, null, null, new MapRDBStatisticsPayload(rowCount, ftsLeadingKeyPayload.getRowCount(), |
| ftsPayload.getAvgRowSize()), jTabGrpScan, scanRel, scanRel.getRowType()); |
| // Add the full table rows while we are at it - represented by <NULL> RexNode, <NULL> QueryCondition. |
| // No ftsLeadingKey so leadingKeyRowcount = totalRowCount |
| addToCache(null, null, null, new MapRDBStatisticsPayload(ftsPayload.getRowCount(), ftsPayload.getRowCount(), |
| ftsPayload.getAvgRowSize()), jTabGrpScan, scanRel, scanRel.getRowType()); |
| // mark stats has been statsAvailable |
| statsAvailable = true; |
| } |
| |
| private boolean addBaseConditions(RexNode condition, StatisticsPayload payload, boolean redundant, |
| Map<String, Double> baseConditionMap, RelDataType rowType) { |
| boolean res = redundant; |
| if (condition.getKind() == SqlKind.AND) { |
| for(RexNode pred : RelOptUtil.conjunctions(condition)) { |
| res = addBaseConditions(pred, payload, res, baseConditionMap, rowType); |
| } |
| } else if (condition.getKind() == SqlKind.OR) { |
| for(RexNode pred : RelOptUtil.disjunctions(condition)) { |
| res = addBaseConditions(pred, payload, res, baseConditionMap, rowType); |
| } |
| } else { |
| // base condition |
| String conditionAsStr = convertRexToString(condition, rowType); |
| if (!redundant) { |
| baseConditionMap.put(conditionAsStr, payload.getRowCount()); |
| return true; |
| } else { |
| baseConditionMap.put(conditionAsStr, -1.0); |
| return false; |
| } |
| } |
| return res; |
| } |
| /* |
| * Adds the statistic(row count) to the cache. Also adds the corresponding QueryCondition->RexNode |
| * condition mapping. |
| */ |
| private void addToCache(RexNode condition, IndexDescriptor idx, IndexCallContext context, |
| StatisticsPayload payload, JsonTableGroupScan jTabGrpScan, RelNode scanRel, RelDataType rowType) { |
| if (condition != null |
| && !condition.isAlwaysTrue()) { |
| RexBuilder builder = scanRel.getCluster().getRexBuilder(); |
| PlannerSettings settings = PrelUtil.getSettings(scanRel.getCluster()); |
| String conditionAsStr = convertRexToString(condition, scanRel.getRowType()); |
| if (statsCache.get(conditionAsStr) == null |
| && payload.getRowCount() != Statistics.ROWCOUNT_UNKNOWN) { |
| Map<String, StatisticsPayload> payloadMap = new HashMap<>(); |
| payloadMap.put(buildUniqueIndexIdentifier(idx), payload); |
| statsCache.put(conditionAsStr, payloadMap); |
| logger.debug("Statistics: StatsCache:<{}, {}>",conditionAsStr, payload); |
| // Always pre-process CAST conditions - Otherwise queryCondition will not be generated correctly |
| RexNode preProcIdxCondition = convertToStatsCondition(condition, idx, context, scanRel, |
| Arrays.asList(SqlKind.CAST)); |
| QueryCondition queryCondition = |
| jTabGrpScan.convertToQueryCondition(convertToLogicalExpression(preProcIdxCondition, |
| rowType, settings, builder)); |
| if (queryCondition != null) { |
| String queryConditionAsStr = queryCondition.toString(); |
| if (conditionRexNodeMap.get(queryConditionAsStr) == null) { |
| conditionRexNodeMap.put(queryConditionAsStr, conditionAsStr); |
| logger.debug("Statistics: QCRNCache:<{}, {}>",queryConditionAsStr, conditionAsStr); |
| } |
| } else { |
| logger.debug("Statistics: QCRNCache: Unable to generate QueryCondition for {}", conditionAsStr); |
| logger.debug("Statistics: QCRNCache: Unable to generate QueryCondition for {}", conditionAsStr); |
| } |
| } else { |
| Map<String, StatisticsPayload> payloadMap = statsCache.get(conditionAsStr); |
| if (payloadMap != null) { |
| if (payloadMap.get(buildUniqueIndexIdentifier(idx)) == null) { |
| payloadMap.put(buildUniqueIndexIdentifier(idx), payload); |
| |
| // rowCount for the same condition should be the same on primary table or index, |
| // let us sync them to the smallest since currently both are over-estimated. |
| // DO NOT sync the leading rowCount since it is based on the leading condition and not the |
| // condition (key for this cache). Hence, for the same condition the leading condition and |
| // consequently the leading rowCount will vary with the index. Syncing them may lead to |
| // unintended side-effects e.g. given a covering index and full table scan and a condition |
| // on a non-id field which happens to be the leading key in the index, the leading rowcount |
| // for the full table scan should be the full table rowcount. Syncing them would incorrectly |
| // make the full table scan cheaper! If required, syncing should be only done based on |
| // leading condition and NOT the condition |
| double minimalRowCount = payload.getRowCount(); |
| for (StatisticsPayload existing : payloadMap.values()) { |
| if (existing.getRowCount() < minimalRowCount) { |
| minimalRowCount = existing.getRowCount(); |
| } |
| } |
| for (StatisticsPayload existing : payloadMap.values()) { |
| if (existing instanceof MapRDBStatisticsPayload) { |
| ((MapRDBStatisticsPayload)existing).rowCount = minimalRowCount; |
| } |
| } |
| } else { |
| logger.debug("Statistics: Filter row count already exists for filter: {}. Skip!", conditionAsStr); |
| } |
| } else { |
| logger.debug("Statistics: Filter row count is UNKNOWN for filter: {}", conditionAsStr); |
| } |
| } |
| } else if (condition == null && idx == null) { |
| fullTableScanPayload = new MapRDBStatisticsPayload(payload.getRowCount(), |
| payload.getLeadingRowCount(), payload.getAvgRowSize()); |
| logger.debug("Statistics: StatsCache:<{}, {}>","NULL", fullTableScanPayload); |
| } |
| } |
| |
| private void addToCache(IndexDescriptor idx, StatisticsPayload payload, StatisticsPayload ftsPayload) { |
| String tabIdxIdentifier = buildUniqueIndexIdentifier(idx); |
| if (fIStatsCache.get(tabIdxIdentifier) == null) { |
| if (ftsPayload.getAvgRowSize() >= payload.getAvgRowSize()) { |
| fIStatsCache.put(tabIdxIdentifier, payload); |
| logger.debug("Statistics: fIStatsCache:<{}, {}>",tabIdxIdentifier, payload); |
| } else { |
| StatisticsPayload cappedPayload = |
| new MapRDBStatisticsPayload(ROWCOUNT_UNKNOWN, ROWCOUNT_UNKNOWN, ftsPayload.getAvgRowSize()); |
| fIStatsCache.put(tabIdxIdentifier,cappedPayload); |
| logger.debug("Statistics: fIStatsCache:<{}, {}> (Capped)",tabIdxIdentifier, cappedPayload); |
| } |
| } else { |
| logger.debug("Statistics: Average row size already exists for :<{}, {}>. Skip!",tabIdxIdentifier, payload); |
| } |
| } |
| |
| /* |
| * Convert the given RexNode to a String representation while also replacing the RexInputRef references |
| * to actual column names. Since, we compare String representations of RexNodes, two equivalent RexNode |
| * expressions may differ in the RexInputRef positions but otherwise the same. |
| * e.g. $1 = 'CA' projection (State, Country) , $2 = 'CA' projection (Country, State) |
| */ |
| private String convertRexToString(RexNode condition, RelDataType rowType) { |
| StringBuilder sb = new StringBuilder(); |
| if (condition == null) { |
| return null; |
| } |
| if (condition.getKind() == SqlKind.AND) { |
| boolean first = true; |
| for(RexNode pred : RelOptUtil.conjunctions(condition)) { |
| if (first) { |
| sb.append(convertRexToString(pred, rowType)); |
| first = false; |
| } else { |
| sb.append(" " + SqlKind.AND.toString() + " "); |
| sb.append(convertRexToString(pred, rowType)); |
| } |
| } |
| return sb.toString(); |
| } else if (condition.getKind() == SqlKind.OR) { |
| boolean first = true; |
| for(RexNode pred : RelOptUtil.disjunctions(condition)) { |
| if (first) { |
| sb.append(convertRexToString(pred, rowType)); |
| first = false; |
| } else { |
| sb.append(" " + SqlKind.OR.toString() + " "); |
| sb.append(convertRexToString(pred, rowType)); |
| } |
| } |
| return sb.toString(); |
| } else { |
| HashMap<String, String> inputRefMapping = new HashMap<>(); |
| /* Based on the rel projection the input reference for the same column may change |
| * during planning. We want the cache to be agnostic to it. Hence, the entry stored |
| * in the cache has the input reference ($i) replaced with the column name |
| */ |
| getInputRefMapping(condition, rowType, inputRefMapping); |
| if (inputRefMapping.keySet().size() > 0) { |
| //Found input ref - replace it |
| String replCondition = condition.toString(); |
| for (String inputRef : inputRefMapping.keySet()) { |
| replCondition = replCondition.replace(inputRef, inputRefMapping.get(inputRef)); |
| } |
| return replCondition; |
| } else { |
| return condition.toString(); |
| } |
| } |
| } |
| |
| /* |
| * Generate the input reference to column mapping for reference replacement. Please |
| * look at the usage in convertRexToString() to understand why this mapping is required. |
| */ |
| private void getInputRefMapping(RexNode condition, RelDataType rowType, |
| HashMap<String, String> mapping) { |
| if (condition instanceof RexCall) { |
| for (RexNode op : ((RexCall) condition).getOperands()) { |
| getInputRefMapping(op, rowType, mapping); |
| } |
| } else if (condition instanceof RexInputRef) { |
| mapping.put(condition.toString(), |
| rowType.getFieldNames().get(condition.hashCode())); |
| } |
| } |
| |
| /* |
| * Additional pre-processing may be required for LIKE/CAST predicates in order to compute statistics. |
| * e.g. A LIKE predicate should be converted to a RANGE predicate for statistics computation. MapR-DB |
| * does not yet support computing statistics for LIKE predicates. |
| */ |
| private RexNode convertToStatsCondition(RexNode condition, IndexDescriptor index, |
| IndexCallContext context, RelNode scanRel, List<SqlKind>typesToProcess) { |
| RexBuilder builder = scanRel.getCluster().getRexBuilder(); |
| if (condition.getKind() == SqlKind.AND) { |
| final List<RexNode> conditions = Lists.newArrayList(); |
| for(RexNode pred : RelOptUtil.conjunctions(condition)) { |
| conditions.add(convertToStatsCondition(pred, index, context, scanRel, typesToProcess)); |
| } |
| return RexUtil.composeConjunction(builder, conditions, false); |
| } else if (condition.getKind() == SqlKind.OR) { |
| final List<RexNode> conditions = Lists.newArrayList(); |
| for(RexNode pred : RelOptUtil.disjunctions(condition)) { |
| conditions.add(convertToStatsCondition(pred, index, context, scanRel, typesToProcess)); |
| } |
| return RexUtil.composeDisjunction(builder, conditions, false); |
| } else if (condition instanceof RexCall) { |
| // LIKE operator - convert to a RANGE predicate, if possible |
| if (typesToProcess.contains(SqlKind.LIKE) |
| && ((RexCall) condition).getOperator().getKind() == SqlKind.LIKE) { |
| return convertLikeToRange((RexCall)condition, builder); |
| } else if (typesToProcess.contains(SqlKind.CAST) |
| && hasCastExpression(condition)) { |
| return convertCastForFIdx(((RexCall) condition), index, context, scanRel); |
| } |
| else { |
| return condition; |
| } |
| } |
| return condition; |
| } |
| |
| /* |
| * Determines whether the given expression contains a CAST expression. Assumes that the |
| * given expression is a valid expression. |
| * Returns TRUE, if it finds at least one instance of CAST operator. |
| */ |
| private boolean hasCastExpression(RexNode condition) { |
| if (condition instanceof RexCall) { |
| if (((RexCall) condition).getOperator().getKind() == SqlKind.CAST) { |
| return true; |
| } |
| for (RexNode op : ((RexCall) condition).getOperands()) { |
| if (hasCastExpression(op)) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| /* |
| * CAST expressions are not understood by MAPR-DB as-is. Hence, we must convert them before passing them |
| * onto MAPR-DB for statistics. Given a functional index, the given expression is converted into an |
| * expression on the `expression` column of the functional index. |
| */ |
| private RexNode convertCastForFIdx(RexCall condition, IndexDescriptor index, |
| IndexCallContext context, RelNode origScan) { |
| if (index == null) { |
| return condition; |
| } |
| FunctionalIndexInfo functionInfo = index.getFunctionalInfo(); |
| if (!functionInfo.hasFunctional()) { |
| return condition; |
| } |
| // The functional index has a different row-type than the original scan. Use the index row-type when |
| // converting the condition |
| RelDataType newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(origScan, context, functionInfo); |
| RexBuilder builder = origScan.getCluster().getRexBuilder(); |
| return FunctionalIndexHelper.convertConditionForIndexScan(condition, |
| origScan, newRowType, builder, functionInfo); |
| } |
| |
| /* |
| * Helper function to perform additional pre-processing for LIKE predicates |
| */ |
| private RexNode convertLikeToRange(RexCall condition, RexBuilder builder) { |
| Preconditions.checkArgument(condition.getOperator().getKind() == SqlKind.LIKE, |
| "Unable to convertLikeToRange: argument is not a LIKE condition!"); |
| HBaseRegexParser parser = null; |
| RexNode arg = null; |
| RexLiteral pattern = null, escape = null; |
| String patternStr = null, escapeStr = null; |
| if (condition.getOperands().size() == 2) { |
| // No escape character specified |
| for (RexNode op : condition.getOperands()) { |
| if (op.getKind() == SqlKind.LITERAL) { |
| pattern = (RexLiteral) op; |
| } else { |
| arg = op; |
| } |
| } |
| // Get the PATTERN strings from the corresponding RexLiteral |
| if (pattern.getTypeName() == SqlTypeName.DECIMAL || |
| pattern.getTypeName() == SqlTypeName.INTEGER) { |
| patternStr = pattern.getValue().toString(); |
| } else if (pattern.getTypeName() == SqlTypeName.CHAR) { |
| patternStr = pattern.getValue2().toString(); |
| } |
| if (patternStr != null) { |
| parser = new HBaseRegexParser(patternStr); |
| } |
| } else if (condition.getOperands().size() == 3) { |
| // Escape character specified |
| for (RexNode op : condition.getOperands()) { |
| if (op.getKind() == SqlKind.LITERAL) { |
| // Assume first literal specifies PATTERN and the second literal specifies the ESCAPE char |
| if (pattern == null) { |
| pattern = (RexLiteral) op; |
| } else { |
| escape = (RexLiteral) op; |
| } |
| } else { |
| arg = op; |
| } |
| } |
| // Get the PATTERN and ESCAPE strings from the corresponding RexLiteral |
| if (pattern.getTypeName() == SqlTypeName.DECIMAL || |
| pattern.getTypeName() == SqlTypeName.INTEGER) { |
| patternStr = pattern.getValue().toString(); |
| } else if (pattern.getTypeName() == SqlTypeName.CHAR) { |
| patternStr = pattern.getValue2().toString(); |
| } |
| if (escape.getTypeName() == SqlTypeName.DECIMAL || |
| escape.getTypeName() == SqlTypeName.INTEGER) { |
| escapeStr = escape.getValue().toString(); |
| } else if (escape.getTypeName() == SqlTypeName.CHAR) { |
| escapeStr = escape.getValue2().toString(); |
| } |
| if (patternStr != null && escapeStr != null) { |
| parser = new HBaseRegexParser(patternStr, escapeStr.toCharArray()[0]); |
| } |
| } |
| if (parser != null) { |
| parser.parse(); |
| String prefix = parser.getPrefixString(); |
| /* |
| * If there is a literal prefix, convert it into an EQUALITY or RANGE predicate |
| */ |
| if (prefix != null) { |
| if (prefix.equals(parser.getLikeString())) { |
| // No WILDCARD present. This turns the LIKE predicate to EQUALITY predicate |
| if (arg != null) { |
| return builder.makeCall(SqlStdOperatorTable.EQUALS, arg, pattern); |
| } |
| } else { |
| // WILDCARD present. This turns the LIKE predicate to RANGE predicate |
| byte[] startKey = HConstants.EMPTY_START_ROW; |
| byte[] stopKey = HConstants.EMPTY_END_ROW; |
| startKey = prefix.getBytes(Charsets.UTF_8); |
| stopKey = startKey.clone(); |
| boolean isMaxVal = true; |
| for (int i = stopKey.length - 1; i >= 0; --i) { |
| int nextByteValue = (0xff & stopKey[i]) + 1; |
| if (nextByteValue < 0xff) { |
| stopKey[i] = (byte) nextByteValue; |
| isMaxVal = false; |
| break; |
| } else { |
| stopKey[i] = 0; |
| } |
| } |
| if (isMaxVal) { |
| stopKey = HConstants.EMPTY_END_ROW; |
| } |
| // TODO: This maybe a potential bug since we assume UTF-8 encoding. However, we follow the |
| // current DB implementation. See HBaseFilterBuilder.createHBaseScanSpec "like" CASE statement |
| RexLiteral startKeyLiteral = builder.makeLiteral(new String(startKey, |
| Charsets.UTF_8)); |
| RexLiteral stopKeyLiteral = builder.makeLiteral(new String(stopKey, |
| Charsets.UTF_8)); |
| if (arg != null) { |
| RexNode startPred = builder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, |
| arg, startKeyLiteral); |
| RexNode stopPred = builder.makeCall(SqlStdOperatorTable.LESS_THAN, arg, stopKeyLiteral); |
| return builder.makeCall(SqlStdOperatorTable.AND, startPred, stopPred); |
| } |
| } |
| } |
| } |
| // Could not convert - return condition as-is. |
| return condition; |
| } |
| |
| /* |
| * Compute the selectivity of the given rowCondition. Retrieve the selectivity |
| * for index conditions from the cache |
| */ |
| private Pair<Double, Boolean> computeSelectivity(RexNode condition, IndexDescriptor idx, double totalRows, |
| RelNode scanRel, Map<String, Double> baseConditionMap) { |
| double selectivity; |
| boolean guess = false; |
| if (totalRows <= 0) { |
| return new Pair<>(1.0, true); |
| } |
| String conditionAsStr = convertRexToString(condition, scanRel.getRowType()); |
| if (condition.getKind() == SqlKind.AND) { |
| selectivity = 1.0; |
| for (RexNode pred : RelOptUtil.conjunctions(condition)) { |
| Pair<Double, Boolean> selPayload = computeSelectivity(pred, idx, totalRows, scanRel, baseConditionMap); |
| if (selPayload.left > 0) { |
| // At least one AND branch is a guess |
| if (selPayload.right == true) { |
| guess = true; |
| } |
| selectivity *= selPayload.left; |
| } |
| } |
| } else if (condition.getKind() == SqlKind.OR) { |
| selectivity = 0.0; |
| for (RexNode pred : RelOptUtil.disjunctions(condition)) { |
| Pair<Double, Boolean> selPayload = computeSelectivity(pred, idx, totalRows, scanRel, baseConditionMap); |
| if (selPayload.left > 0.0) { |
| // At least one OR branch is a guess |
| if (selPayload.right == true) { |
| guess = true; |
| } |
| selectivity += selPayload.left; |
| } |
| } |
| // Cap selectivity of OR'ed predicates at 0.25 if at least one predicate is a guess (Calcite does the same) |
| if (guess && selectivity > 0.25) { |
| selectivity = 0.25; |
| } |
| } else { |
| guess = false; |
| if (baseConditionMap.get(conditionAsStr) != null) { |
| double rowCount = baseConditionMap.get(conditionAsStr); |
| if (rowCount != -1.0) { |
| selectivity = rowCount / totalRows; |
| } else { |
| // Ignore |
| selectivity = -1.0; |
| guess = true; |
| } |
| } else { |
| selectivity = RelMdUtil.guessSelectivity(condition); |
| guess = true; |
| } |
| return new Pair<>(selectivity, guess); |
| } |
| // Cap selectivity to be between 0.0 and 1.0 |
| selectivity = Math.min(1.0, selectivity); |
| selectivity = Math.max(0.0, selectivity); |
| logger.debug("Statistics: computeSelectivity: Cache MISS: Computed {} -> {}", conditionAsStr, selectivity); |
| return new Pair<>(selectivity, guess); |
| } |
| |
| /* |
| * Filters out indexes from the given collection based on the row key of indexes i.e. after filtering |
| * the given collection would contain only one index for each distinct row key in the collection |
| */ |
| private IndexCollection distinctFKeyIndexes(IndexCollection indexes, RelNode scanRel) { |
| IndexCollection distinctIdxCollection = new DrillIndexCollection(scanRel, new HashSet<DrillIndexDescriptor>()); |
| Iterator<IndexDescriptor> iterator = indexes.iterator(); |
| Map<String, List<IndexDescriptor>> firstColIndexMap = new HashMap<>(); |
| while (iterator.hasNext()) { |
| IndexDescriptor index = iterator.next(); |
| // If index has columns - the first column is the leading column for the index |
| if (index.getIndexColumns() != null) { |
| List<IndexDescriptor> idxList; |
| String firstCol = convertLExToStr(index.getIndexColumns().get(0)); |
| if (firstColIndexMap.get(firstCol) != null) { |
| idxList = firstColIndexMap.get(firstCol); |
| } else { |
| idxList = new ArrayList<>(); |
| } |
| idxList.add(index); |
| firstColIndexMap.put(firstCol, idxList); |
| } |
| } |
| for (String firstCol : firstColIndexMap.keySet()) { |
| List<IndexDescriptor> indexesSameFirstCol = firstColIndexMap.get(firstCol); |
| double maxAvgRowSize = -1.0; |
| IndexDescriptor selectedIdx = null; |
| for (IndexDescriptor idx : indexesSameFirstCol) { |
| String tabIdxIdentifier = buildUniqueIndexIdentifier(idx); |
| double idxRowSize = fIStatsCache.get(tabIdxIdentifier).getAvgRowSize(); |
| // Prefer index with largest average row-size, breaking ties lexicographically |
| if (idxRowSize > maxAvgRowSize |
| || (idxRowSize == maxAvgRowSize |
| && (selectedIdx == null || idx.getIndexName().compareTo(selectedIdx.getIndexName()) < 0))) { |
| maxAvgRowSize = idxRowSize; |
| selectedIdx = idx; |
| } |
| } |
| assert (selectedIdx != null); |
| distinctIdxCollection.addIndex(selectedIdx); |
| } |
| return distinctIdxCollection; |
| } |
| |
| /* |
| * Returns the String representation for the given Logical Expression |
| */ |
| private String convertLExToStr(LogicalExpression lex) { |
| StringBuilder sb = new StringBuilder(); |
| ExpressionStringBuilder esb = new ExpressionStringBuilder(); |
| lex.accept(esb, sb); |
| return sb.toString(); |
| } |
| |
| /* |
| * Converts the given RexNode condition into a Drill logical expression. |
| */ |
| private LogicalExpression convertToLogicalExpression(RexNode condition, |
| RelDataType type, PlannerSettings settings, RexBuilder builder) { |
| LogicalExpression conditionExp; |
| try { |
| conditionExp = DrillOptiq.toDrill(new DrillParseContext(settings), type, builder, condition); |
| } catch (ClassCastException e) { |
| return null; |
| } |
| return conditionExp; |
| } |
| } |