| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.planner.index; |
| |
| |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.calcite.plan.RelOptCost; |
| import org.apache.calcite.plan.RelOptPlanner; |
| import org.apache.calcite.rel.RelFieldCollation.NullDirection; |
| import org.apache.drill.common.expression.SchemaPath; |
| import org.apache.drill.exec.expr.CloneVisitor; |
| import org.apache.drill.exec.physical.base.DbGroupScan; |
| import org.apache.drill.exec.physical.base.GroupScan; |
| import org.apache.drill.exec.planner.cost.DrillCostBase; |
| import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory; |
| import org.apache.drill.exec.planner.cost.PluginCost; |
| import org.apache.drill.exec.store.mapr.PluginConstants; |
| import org.apache.drill.exec.util.EncodedSchemaPathSet; |
| import org.apache.drill.common.expression.LogicalExpression; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| |
| public class MapRDBIndexDescriptor extends DrillIndexDescriptor { |
| |
| protected final Object desc; |
| protected final Set<LogicalExpression> allFields; |
| protected final Set<LogicalExpression> indexedFields; |
| protected MapRDBFunctionalIndexInfo functionalInfo; |
| protected PluginCost pluginCost; |
| |
| public MapRDBIndexDescriptor(List<LogicalExpression> indexCols, |
| CollationContext indexCollationContext, |
| List<LogicalExpression> nonIndexCols, |
| List<LogicalExpression> rowKeyColumns, |
| String indexName, |
| String tableName, |
| IndexType type, |
| Object desc, |
| DbGroupScan scan, |
| NullDirection nullsDirection) { |
| super(indexCols, indexCollationContext, nonIndexCols, rowKeyColumns, indexName, tableName, type, nullsDirection); |
| this.desc = desc; |
| this.indexedFields = ImmutableSet.copyOf(indexColumns); |
| this.allFields = new ImmutableSet.Builder<LogicalExpression>() |
| .add(PluginConstants.DOCUMENT_SCHEMA_PATH) |
| .addAll(indexColumns) |
| .addAll(nonIndexColumns) |
| .build(); |
| this.pluginCost = scan.getPluginCostModel(); |
| } |
| |
| public Object getOriginalDesc(){ |
| return desc; |
| } |
| |
| @Override |
| public boolean isCoveringIndex(List<LogicalExpression> expressions) { |
| List<LogicalExpression> decodedCols = new DecodePathinExpr().parseExpressions(expressions); |
| return columnsInIndexFields(decodedCols, allFields); |
| } |
| |
| @Override |
| public boolean allColumnsIndexed(Collection<LogicalExpression> expressions) { |
| List<LogicalExpression> decodedCols = new DecodePathinExpr().parseExpressions(expressions); |
| return columnsInIndexFields(decodedCols, indexedFields); |
| } |
| |
| @Override |
| public boolean someColumnsIndexed(Collection<LogicalExpression> columns) { |
| return columnsIndexed(columns, false); |
| } |
| |
| private boolean columnsIndexed(Collection<LogicalExpression> expressions, boolean allColsIndexed) { |
| List<LogicalExpression> decodedCols = new DecodePathinExpr().parseExpressions(expressions); |
| if (allColsIndexed) { |
| return columnsInIndexFields(decodedCols, indexedFields); |
| } else { |
| return someColumnsInIndexFields(decodedCols, indexedFields); |
| } |
| } |
| |
| public FunctionalIndexInfo getFunctionalInfo() { |
| if (this.functionalInfo == null) { |
| this.functionalInfo = new MapRDBFunctionalIndexInfo(this); |
| } |
| return this.functionalInfo; |
| } |
| |
| /** |
| * Search through a LogicalExpression, finding all referenced schema paths |
| * and replace them with decoded paths. |
| * If one encoded path could be decoded to multiple paths, add these decoded paths to |
| * the end of returned list of expressions from parseExpressions. |
| */ |
| private class DecodePathinExpr extends CloneVisitor { |
| Set<SchemaPath> schemaPathSet = Sets.newHashSet(); |
| |
| public List<LogicalExpression> parseExpressions(Collection<LogicalExpression> expressions) { |
| List<LogicalExpression> allCols = Lists.newArrayList(); |
| Collection<SchemaPath> decoded; |
| |
| for (LogicalExpression expr : expressions) { |
| LogicalExpression nonDecoded = expr.accept(this, null); |
| if (nonDecoded != null) { |
| allCols.add(nonDecoded); |
| } |
| } |
| |
| if (schemaPathSet.size() > 0) { |
| decoded = EncodedSchemaPathSet.decode(schemaPathSet); |
| allCols.addAll(decoded); |
| } |
| return allCols; |
| } |
| |
| @Override |
| public LogicalExpression visitSchemaPath(SchemaPath path, Void value) { |
| if (EncodedSchemaPathSet.isEncodedSchemaPath(path)) { |
| // if decoded size is not one, incoming path is encoded path thus there is no cast or other function applied on it, |
| // since users won't pass in encoded fields, so it is safe to return null, |
| schemaPathSet.add(path); |
| return null; |
| } else { |
| return path; |
| } |
| } |
| |
| } |
| |
| @Override |
| public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner, |
| int numProjectedFields, GroupScan primaryTableGroupScan) { |
| Preconditions.checkArgument(primaryTableGroupScan instanceof DbGroupScan); |
| DbGroupScan dbGroupScan = (DbGroupScan) primaryTableGroupScan; |
| DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory(); |
| double totalRows = indexProps.getTotalRows(); |
| double leadRowCount = indexProps.getLeadingSelectivity() * totalRows; |
| double avgRowSize = indexProps.getAvgRowSize(); |
| if (indexProps.isCovering()) { // covering index |
| // int numIndexCols = allFields.size(); |
| // for disk i/o, all index columns are going to be read into memory |
| double numBlocks = Math.ceil((leadRowCount * avgRowSize)/pluginCost.getBlockSize(primaryTableGroupScan)); |
| double diskCost = numBlocks * pluginCost.getSequentialBlockReadCost(primaryTableGroupScan); |
| // cpu cost is cost of filter evaluation for the remainder condition |
| double cpuCost = 0.0; |
| if (indexProps.getTotalRemainderFilter() != null) { |
| cpuCost = leadRowCount * DrillCostBase.COMPARE_CPU_COST; |
| } |
| double networkCost = 0.0; // TODO: add network cost once full table scan also considers network cost |
| return costFactory.makeCost(leadRowCount, cpuCost, diskCost, networkCost); |
| |
| } else { // non-covering index |
| // int numIndexCols = allFields.size(); |
| double numBlocksIndex = Math.ceil((leadRowCount * avgRowSize)/pluginCost.getBlockSize(primaryTableGroupScan)); |
| double diskCostIndex = numBlocksIndex * pluginCost.getSequentialBlockReadCost(primaryTableGroupScan); |
| // for the primary table join-back each row may belong to a different block, so in general num_blocks = num_rows; |
| // however, num_blocks cannot exceed the total number of blocks of the table |
| double totalBlocksPrimary = Math.ceil((dbGroupScan.getColumns().size() * |
| pluginCost.getAverageColumnSize(primaryTableGroupScan) * totalRows)/ |
| pluginCost.getBlockSize(primaryTableGroupScan)); |
| double diskBlocksPrimary = Math.min(totalBlocksPrimary, leadRowCount); |
| double diskCostPrimary = diskBlocksPrimary * pluginCost.getRandomBlockReadCost(primaryTableGroupScan); |
| double diskCostTotal = diskCostIndex + diskCostPrimary; |
| |
| // cpu cost of remainder condition evaluation over the selected rows |
| double cpuCost = 0.0; |
| if (indexProps.getTotalRemainderFilter() != null) { |
| cpuCost = leadRowCount * DrillCostBase.COMPARE_CPU_COST; |
| } |
| double networkCost = 0.0; // TODO: add network cost once full table scan also considers network cost |
| return costFactory.makeCost(leadRowCount, cpuCost, diskCostTotal, networkCost); |
| } |
| } |
| |
| // Future use once full table scan also includes network cost |
| private double getNetworkCost(double leadRowCount, int numProjectedFields, boolean isCovering, |
| GroupScan primaryTableGroupScan) { |
| if (isCovering) { |
| // db server will send only the projected columns to the db client for the selected |
| // number of rows, so network cost is based on the number of actual projected columns |
| double networkCost = leadRowCount * numProjectedFields * pluginCost.getAverageColumnSize(primaryTableGroupScan); |
| return networkCost; |
| } else { |
| // only the rowkey column is projected from the index and sent over the network |
| double networkCostIndex = leadRowCount * 1 * pluginCost.getAverageColumnSize(primaryTableGroupScan); |
| |
| // after join-back to primary table, all projected columns are sent over the network |
| double networkCostPrimary = leadRowCount * numProjectedFields * pluginCost.getAverageColumnSize(primaryTableGroupScan); |
| |
| return networkCostIndex + networkCostPrimary; |
| } |
| |
| } |
| |
| @Override |
| public PluginCost getPluginCostModel() { |
| return pluginCost; |
| } |
| } |