blob: 2636f1408038717c9311ac0b87f9c23bf54cceee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.index.generators;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollations;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.DbGroupScan;
import org.apache.drill.exec.physical.base.IndexGroupScan;
import org.apache.drill.exec.planner.common.JoinControl;
import org.apache.drill.exec.planner.index.IndexLogicalPlanCallContext;
import org.apache.drill.exec.planner.index.IndexDescriptor;
import org.apache.drill.exec.planner.index.FunctionalIndexInfo;
import org.apache.drill.exec.planner.index.FunctionalIndexHelper;
import org.apache.drill.exec.planner.index.IndexPlanUtils;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
import org.apache.drill.exec.planner.physical.FilterPrel;
import org.apache.drill.exec.planner.physical.HashJoinPrel;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.Prule;
import org.apache.drill.exec.planner.physical.RowKeyJoinPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
/**
* Generate a non-covering index plan that is equivalent to the original plan. The non-covering plan consists
* of a join-back between an index lookup and the primary table. This join-back is performed using a rowkey join.
* For the primary table, we use a restricted scan that allows doing skip-scan instead of sequential scan.
*
* Original Plan:
* Filter
* |
* DBGroupScan
*
* New Plan:
*
* RowKeyJoin
* / \
* Remainder Filter Exchange
* | |
* Restricted Filter (with index columns only)
* DBGroupScan |
* IndexGroupScan
*
* This plan will be further optimized by the filter pushdown rule of the Index plugin which should
* push the index column filters into the index scan.
*/
public class NonCoveringIndexPlanGenerator extends AbstractIndexPlanGenerator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NonCoveringIndexPlanGenerator.class);
final protected IndexGroupScan indexGroupScan;
final private IndexDescriptor indexDesc;
// Ideally This functionInfo should be cached along with indexDesc.
final protected FunctionalIndexInfo functionInfo;
public NonCoveringIndexPlanGenerator(IndexLogicalPlanCallContext indexContext,
IndexDescriptor indexDesc,
IndexGroupScan indexGroupScan,
RexNode indexCondition,
RexNode remainderCondition,
RexBuilder builder,
PlannerSettings settings) {
super(indexContext, indexCondition, remainderCondition, builder, settings);
this.indexGroupScan = indexGroupScan;
this.indexDesc = indexDesc;
this.functionInfo = indexDesc.getFunctionalInfo();
}
@Override
public RelNode convertChild(final RelNode topRel, final RelNode input) throws InvalidRelException {
if (indexGroupScan == null) {
logger.error("Null indexgroupScan in NonCoveringIndexPlanGenerator.convertChild");
return null;
}
RelDataType dbscanRowType = convertRowType(origScan.getRowType(), origScan.getCluster().getTypeFactory());
RelDataType indexScanRowType = FunctionalIndexHelper.convertRowTypeForIndexScan(
origScan, indexContext.getOrigMarker(), indexGroupScan, functionInfo);
DrillDistributionTrait partition = IndexPlanUtils.scanIsPartition(IndexPlanUtils.getGroupScan(origScan))?
DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON;
ScanPrel indexScanPrel = new ScanPrel(origScan.getCluster(),
origScan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition), indexGroupScan, indexScanRowType, origScan.getTable());
DbGroupScan origDbGroupScan = (DbGroupScan)IndexPlanUtils.getGroupScan(origScan);
// right (build) side of the rowkey join: do a distribution of project-filter-indexscan subplan
RexNode convertedIndexCondition = FunctionalIndexHelper.convertConditionForIndexScan(indexCondition,
origScan, indexScanRowType, builder, functionInfo);
FilterPrel rightIndexFilterPrel = new FilterPrel(indexScanPrel.getCluster(), indexScanPrel.getTraitSet(),
indexScanPrel, convertedIndexCondition);
double finalRowCount = indexGroupScan.getRowCount(indexContext.getOrigCondition(), origScan);
// project the rowkey column from the index scan
List<RexNode> rightProjectExprs = Lists.newArrayList();
int rightRowKeyIndex = getRowKeyIndex(indexScanPrel.getRowType(), origScan);//indexGroupScan.getRowKeyOrdinal();
assert rightRowKeyIndex >= 0;
rightProjectExprs.add(RexInputRef.of(rightRowKeyIndex, indexScanPrel.getRowType()));
final List<RelDataTypeField> indexScanFields = indexScanPrel.getRowType().getFieldList();
final RelDataTypeFactory.FieldInfoBuilder rightFieldTypeBuilder =
indexScanPrel.getCluster().getTypeFactory().builder();
// build the row type for the right Project
final RelDataTypeField rightRowKeyField = indexScanFields.get(rightRowKeyIndex);
rightFieldTypeBuilder.add(rightRowKeyField);
final RelDataType rightProjectRowType = rightFieldTypeBuilder.build();
final ProjectPrel rightIndexProjectPrel = new ProjectPrel(indexScanPrel.getCluster(), indexScanPrel.getTraitSet(),
rightIndexFilterPrel, rightProjectExprs, rightProjectRowType);
// create a RANGE PARTITION on the right side (this could be removed later during ExcessiveExchangeIdentifier phase
// if the estimated row count is smaller than slice_target
final RelNode rangeDistRight = createRangeDistRight(rightIndexProjectPrel, rightRowKeyField, origDbGroupScan);
// the range partitioning adds an extra column for the partition id but in the final plan we already have a
// renaming Project for the _id field inserted as part of the JoinPrelRenameVisitor. Thus, we are not inserting
// a separate Project here.
final RelNode convertedRight = rangeDistRight;
// left (probe) side of the rowkey join
List<SchemaPath> cols = new ArrayList<SchemaPath>(origDbGroupScan.getColumns());
if (!checkRowKey(cols)) {
cols.add(origDbGroupScan.getRowKeyPath());
}
// Create a restricted groupscan from the primary table's groupscan
DbGroupScan restrictedGroupScan = (DbGroupScan)origDbGroupScan.getRestrictedScan(cols);
if (restrictedGroupScan == null) {
logger.error("Null restricted groupscan in NonCoveringIndexPlanGenerator.convertChild");
return null;
}
// Set left side (restricted scan) row count as rows returned from right side (index scan)
DrillScanRel rightIdxRel = new DrillScanRel(origScan.getCluster(), origScan.getTraitSet(),
origScan.getTable(), origScan.getRowType(), indexContext.getScanColumns());
double rightIdxRowCount = indexGroupScan.getRowCount(indexCondition, rightIdxRel);
restrictedGroupScan.setRowCount(null, rightIdxRowCount, rightIdxRowCount);
RelTraitSet origScanTraitSet = origScan.getTraitSet();
RelTraitSet restrictedScanTraitSet = origScanTraitSet.plus(Prel.DRILL_PHYSICAL);
// Create the collation traits for restricted scan based on the index columns under the
// conditions that (a) the index actually has collation property (e.g hash indexes don't)
// and (b) if an explicit sort operation is not enforced
RelCollation collation = null;
if (indexDesc.getCollation() != null &&
!settings.isIndexForceSortNonCovering()) {
collation = IndexPlanUtils.buildCollationNonCoveringIndexScan(indexDesc, indexScanRowType, dbscanRowType, indexContext);
if (restrictedScanTraitSet.getTrait(RelCollationTraitDef.INSTANCE) != null) { // replace existing trait
restrictedScanTraitSet = restrictedScanTraitSet.plus(partition).replace(collation);
} else { // add new one
restrictedScanTraitSet = restrictedScanTraitSet.plus(partition).plus(collation);
}
}
ScanPrel dbScan = new ScanPrel(origScan.getCluster(),
restrictedScanTraitSet, restrictedGroupScan, dbscanRowType, origScan.getTable());
RelNode lastLeft = dbScan;
// build the row type for the left Project
List<RexNode> leftProjectExprs = Lists.newArrayList();
int leftRowKeyIndex = getRowKeyIndex(dbScan.getRowType(), origScan);
final RelDataTypeField leftRowKeyField = dbScan.getRowType().getFieldList().get(leftRowKeyIndex);
final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder =
dbScan.getCluster().getTypeFactory().builder();
// We are applying the same index condition to primary table's restricted scan. The reason is, the index may be an async
// index .. i.e it is not synchronously updated along with the primary table update as part of a single transaction, so it
// is possible that after or during index scan, the primary table rows may have been updated and no longer satisfy the index
// condition. By re-applying the index condition here, we will ensure non-qualifying records are filtered out.
// The remainder condition will be applied on top of RowKeyJoin.
FilterPrel leftIndexFilterPrel = null;
if (indexDesc.isAsyncIndex()) {
leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
dbScan, indexContext.getOrigCondition());
lastLeft = leftIndexFilterPrel;
}
RelDataType origRowType = origProject == null ? origScan.getRowType() : origProject.getRowType();
if (origProject != null) {// then we also don't need a project
// new Project's rowtype is original Project's rowtype [plus rowkey if rowkey is not in original rowtype]
List<RelDataTypeField> origProjFields = origRowType.getFieldList();
leftFieldTypeBuilder.addAll(origProjFields);
// get the exprs from the original Project
leftProjectExprs.addAll(IndexPlanUtils.getProjects(origProject));
// add the rowkey IFF rowkey is not in orig scan
if (getRowKeyIndex(origRowType, origScan) < 0) {
leftFieldTypeBuilder.add(leftRowKeyField);
leftProjectExprs.add(RexInputRef.of(leftRowKeyIndex, dbScan.getRowType()));
}
final RelDataType leftProjectRowType = leftFieldTypeBuilder.build();
//build collation in project
if (!settings.isIndexForceSortNonCovering()){
collation = IndexPlanUtils.buildCollationProject(leftProjectExprs, null, dbScan, functionInfo, indexContext);
}
final ProjectPrel leftIndexProjectPrel = new ProjectPrel(dbScan.getCluster(),
collation != null ? dbScan.getTraitSet().plus(collation) : dbScan.getTraitSet(),
leftIndexFilterPrel == null ? dbScan : leftIndexFilterPrel, leftProjectExprs, leftProjectRowType);
lastLeft = leftIndexProjectPrel;
}
final RelTraitSet leftTraits = dbScan.getTraitSet().plus(Prel.DRILL_PHYSICAL);
// final RelNode convertedLeft = convert(leftIndexProjectPrel, leftTraits);
final RelNode convertedLeft = Prule.convert(lastLeft, leftTraits);
// find the rowkey column on the left side of join
final int leftRowKeyIdx = getRowKeyIndex(convertedLeft.getRowType(), origScan);
final int rightRowKeyIdx = 0; // only rowkey field is being projected from right side
assert leftRowKeyIdx >= 0;
List<Integer> leftJoinKeys = ImmutableList.of(leftRowKeyIdx);
List<Integer> rightJoinKeys = ImmutableList.of(rightRowKeyIdx);
RexNode joinCondition =
RelOptUtil.createEquiJoinCondition(convertedLeft, leftJoinKeys,
convertedRight, rightJoinKeys, builder);
RelNode newRel;
if (settings.isIndexUseHashJoinNonCovering()) {
//for hash join, collation will be cleared
HashJoinPrel hjPrel = new HashJoinPrel(topRel.getCluster(), leftTraits, convertedLeft,
convertedRight, joinCondition, JoinRelType.INNER, false /* no swap */,
null /* no runtime filter */,
true /* useful for join-restricted scans */, JoinControl.DEFAULT);
newRel = hjPrel;
} else {
//if there is collation, add to rowkey join
RowKeyJoinPrel rjPrel = new RowKeyJoinPrel(topRel.getCluster(),
collation != null ? leftTraits.plus(collation) : leftTraits,
convertedLeft, convertedRight, joinCondition, JoinRelType.INNER);
rjPrel.setEstimatedRowCount(finalRowCount);
newRel = rjPrel;
}
final RelDataTypeFactory.FieldInfoBuilder finalFieldTypeBuilder =
origScan.getCluster().getTypeFactory().builder();
List<RelDataTypeField> rjRowFields = newRel.getRowType().getFieldList();
int toRemoveRowKeyCount = 1;
if (getRowKeyIndex(origRowType, origScan) < 0 ) {
toRemoveRowKeyCount = 2;
}
finalFieldTypeBuilder.addAll(rjRowFields.subList(0, rjRowFields.size()-toRemoveRowKeyCount));
final RelDataType finalProjectRowType = finalFieldTypeBuilder.build();
List<RexNode> resetExprs = Lists.newArrayList();
for (int idx=0; idx<rjRowFields.size()-toRemoveRowKeyCount; ++idx) {
resetExprs.add(RexInputRef.of(idx, newRel.getRowType()));
}
//rewrite the collation for this projectPrel
final ProjectPrel resetProjectPrel = new ProjectPrel(newRel.getCluster(), newRel.getTraitSet(),
newRel, resetExprs, finalProjectRowType);
newRel = resetProjectPrel;
if ( upperProject != null) {
RelCollation newCollation = RelCollations.of(RelCollations.EMPTY.getFieldCollations());
DrillDistributionTrait newDist = null;
newDist = upperProject.getInput().getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
if (!settings.isIndexForceSortNonCovering()) {
newCollation = IndexPlanUtils.buildCollationProject(IndexPlanUtils.getProjects(upperProject), origProject, origScan,
functionInfo, indexContext);
}
RelTraitSet newProjectTraits = newTraitSet(Prel.DRILL_PHYSICAL, newDist, newCollation);
ProjectPrel cap = new ProjectPrel(upperProject.getCluster(),
newProjectTraits,
newRel, IndexPlanUtils.getProjects(upperProject), upperProject.getRowType());
newRel = cap;
}
//whether to remove sort
if (indexContext.getSort() != null) {
// When ordering is required, serialize the index scan side. With parallel index scans, the rowkey join may receive
// unsorted input because ordering is not guaranteed across different parallel inputs.
if (toRemoveSort(indexContext.getCollation(), newRel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE))) {
((IndexGroupScan)indexScanPrel.getGroupScan()).setParallelizationWidth(1);
}
newRel = getSortNode(indexContext, newRel, false,true, true);
Preconditions.checkArgument(newRel != null);
}
RelNode finalRel = Prule.convert(newRel, newRel.getTraitSet());
logger.debug("NonCoveringIndexPlanGenerator got finalRel {} from origScan {}",
finalRel.toString(), origScan.toString());
return finalRel;
}
}