blob: 638016c14794d057ce90d8ccceec93b23ec65911 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.index.generators;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.Pair;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.DbGroupScan;
import org.apache.drill.exec.physical.base.IndexGroupScan;
import org.apache.drill.exec.planner.common.JoinControl;
import org.apache.drill.exec.planner.index.IndexLogicalPlanCallContext;
import org.apache.drill.exec.planner.index.IndexDescriptor;
import org.apache.drill.exec.planner.index.FunctionalIndexInfo;
import org.apache.drill.exec.planner.index.FunctionalIndexHelper;
import org.apache.drill.exec.planner.index.IndexPlanUtils;
import org.apache.drill.exec.planner.index.IndexConditionInfo;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionType;
import org.apache.drill.exec.planner.physical.FilterPrel;
import org.apache.drill.exec.planner.physical.HashJoinPrel;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.Prule;
import org.apache.drill.exec.planner.physical.RowKeyJoinPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* IndexScanIntersectGenerator is to generate index plan against multiple index tables,
* the input indexes are assumed to be ranked by selectivity(low to high) already.
*/
public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IndexIntersectPlanGenerator.class);
final Map<IndexDescriptor, IndexConditionInfo> indexInfoMap;
public IndexIntersectPlanGenerator(IndexLogicalPlanCallContext indexContext,
Map<IndexDescriptor, IndexConditionInfo> indexInfoMap,
RexBuilder builder,
PlannerSettings settings) {
super(indexContext, null, null, builder, settings);
this.indexInfoMap = indexInfoMap;
}
public RelNode buildRowKeyJoin(RelNode left, RelNode right, boolean isRowKeyJoin, int htControl)
throws InvalidRelException {
final int leftRowKeyIdx = getRowKeyIndex(left.getRowType(), origScan);
final int rightRowKeyIdx = 0; // only rowkey field is being projected from right side
assert leftRowKeyIdx >= 0;
List<Integer> leftJoinKeys = ImmutableList.of(leftRowKeyIdx);
List<Integer> rightJoinKeys = ImmutableList.of(rightRowKeyIdx);
logger.trace(String.format(
"buildRowKeyJoin: leftIdx: %d, rightIdx: %d",
leftRowKeyIdx, rightRowKeyIdx));
RexNode joinCondition =
RelOptUtil.createEquiJoinCondition(left, leftJoinKeys,
right, rightJoinKeys, builder);
if (isRowKeyJoin) {
RelNode newRel;
if (settings.isIndexUseHashJoinNonCovering()) {
HashJoinPrel hjPrel = new HashJoinPrel(left.getCluster(), left.getTraitSet(), left,
right, joinCondition, JoinRelType.INNER, false /* no swap */, null /* no runtime filter */,
isRowKeyJoin, htControl);
newRel = hjPrel;
} else {
RowKeyJoinPrel rjPrel = new RowKeyJoinPrel(left.getCluster(), left.getTraitSet(),
left, right, joinCondition, JoinRelType.INNER);
newRel = rjPrel;
}
// since there is a restricted Scan on left side, assume original project
return buildOriginalProject(newRel);
} else {
// there is no restricted scan on left, do a regular rowkey join
HashJoinPrel hjPrel = new HashJoinPrel(left.getCluster(), left.getTraitSet(), left,
right, joinCondition, JoinRelType.INNER, false /* no swap */, null /* no runtime filter */,
isRowKeyJoin, htControl);
return buildRowKeyProject(hjPrel, leftRowKeyIdx);
}
}
public RelNode buildRowKeyProject(RelNode inputRel, int fieldIndex) {
List<RelDataTypeField> inputFields = inputRel.getRowType().getFieldList();
final RelDataTypeField rowKeyField = inputFields.get(fieldIndex);
RexNode expr = builder.makeInputRef(rowKeyField.getType(), rowKeyField.getIndex());
List<RexNode> exprs = Lists.newArrayList();
exprs.add(expr);
final RelDataTypeFactory.FieldInfoBuilder rightFieldTypeBuilder =
inputRel.getCluster().getTypeFactory().builder();
rightFieldTypeBuilder.add(rowKeyField);
final RelDataType projectRowType = rightFieldTypeBuilder.build();
ProjectPrel proj = new ProjectPrel(inputRel.getCluster(), inputRel.getTraitSet(), inputRel, exprs, projectRowType);
return proj;
}
public RelNode buildOriginalProject (RelNode newRel) {
RelDataType origRowType = origProject == null ? origScan.getRowType() : origProject.getRowType();
final RelDataTypeFactory.FieldInfoBuilder finalFieldTypeBuilder =
origScan.getCluster().getTypeFactory().builder();
List<RelDataTypeField> hjRowFields = newRel.getRowType().getFieldList();
int toRemoveRowKeyCount = 1;
if (getRowKeyIndex(origRowType, origScan) < 0 ) {
toRemoveRowKeyCount = 2;
}
finalFieldTypeBuilder.addAll(hjRowFields.subList(0, hjRowFields.size()-toRemoveRowKeyCount));
final RelDataType finalProjectRowType = finalFieldTypeBuilder.build();
List<RexNode> resetExprs = Lists.newArrayList();
for (int idx=0; idx<hjRowFields.size()-toRemoveRowKeyCount; ++idx) {
resetExprs.add(RexInputRef.of(idx, newRel.getRowType()));
}
final ProjectPrel resetProjectPrel = new ProjectPrel(newRel.getCluster(), newRel.getTraitSet(),
newRel, resetExprs, finalProjectRowType);
newRel = resetProjectPrel;
RelNode finalRel = Prule.convert(newRel, newRel.getTraitSet());
return finalRel;
}
private FunctionalIndexInfo getFunctionalIndexInfo(IndexDescriptor index) {
return index.getFunctionalInfo();
}
public RelNode buildIntersectPlan(Map.Entry<IndexDescriptor, RexNode> pair, RelNode right,
boolean generateDistribution) throws InvalidRelException {
IndexDescriptor index = pair.getKey();
RexNode condition = pair.getValue();
FunctionalIndexInfo functionInfo = getFunctionalIndexInfo(index);
IndexGroupScan indexScan = index.getIndexGroupScan();
RelDataType indexScanRowType = FunctionalIndexHelper.convertRowTypeForIndexScan(
origScan, indexContext.getOrigMarker(), indexScan, functionInfo);
DrillDistributionTrait partition = IndexPlanUtils.scanIsPartition(IndexPlanUtils.getGroupScan(origScan))?
DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON;
ScanPrel indexScanPrel = new ScanPrel(origScan.getCluster(),
origScan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition), indexScan, indexScanRowType, origScan.getTable());
FilterPrel indexFilterPrel = new FilterPrel(indexScanPrel.getCluster(), indexScanPrel.getTraitSet(),
indexScanPrel, FunctionalIndexHelper.convertConditionForIndexScan(condition, origScan,
indexScanRowType, builder, functionInfo));
// project the rowkey column from the index scan
List<RexNode> indexProjectExprs = Lists.newArrayList();
int rowKeyIndex = getRowKeyIndex(indexScanPrel.getRowType(), origScan);
assert rowKeyIndex >= 0;
indexProjectExprs.add(RexInputRef.of(rowKeyIndex, indexScanPrel.getRowType()));
final RelDataTypeFactory.FieldInfoBuilder rightFieldTypeBuilder =
indexScanPrel.getCluster().getTypeFactory().builder();
// build the row type for the right Project
final List<RelDataTypeField> indexScanFields = indexScanPrel.getRowType().getFieldList();
final RelDataTypeField rightRowKeyField = indexScanFields.get(rowKeyIndex);
rightFieldTypeBuilder.add(rightRowKeyField);
final RelDataType indexProjectRowType = rightFieldTypeBuilder.build();
final ProjectPrel indexProjectPrel = new ProjectPrel(indexScanPrel.getCluster(), indexScanPrel.getTraitSet(),
indexFilterPrel, indexProjectExprs, indexProjectRowType);
RelTraitSet rightSideTraits = newTraitSet().plus(Prel.DRILL_PHYSICAL);
RelMetadataQuery mq = indexProjectPrel.getCluster().getMetadataQuery();
// if build(right) side does not exist, this index scan is the right most.
if (right == null) {
if (partition == DrillDistributionTrait.RANDOM_DISTRIBUTED &&
settings.getSliceTarget() < indexProjectPrel.estimateRowCount(mq)) {
final DrillDistributionTrait distRight =
new DrillDistributionTrait(DistributionType.BROADCAST_DISTRIBUTED);
rightSideTraits = newTraitSet(distRight).plus(Prel.DRILL_PHYSICAL);
}
}
RelNode converted = Prule.convert(indexProjectPrel, rightSideTraits);
if (right == null) {
return converted;
}
// if build(right) side exist, the plan we got in 'converted' is left (probe). Intersect with right(build) side
RelNode finalRel = buildRowKeyJoin(converted, right, false, JoinControl.INTERSECT_DISTINCT);
if (generateDistribution &&
right.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE) != DrillDistributionTrait.SINGLETON) {
final DrillDistributionTrait distRight =
new DrillDistributionTrait(DistributionType.BROADCAST_DISTRIBUTED);
rightSideTraits = newTraitSet(distRight).plus(Prel.DRILL_PHYSICAL);
// This join will serve as the right side for the next intersection join, if applicable
finalRel = Prule.convert(finalRel, rightSideTraits);
}
logger.trace("IndexIntersectPlanGenerator got finalRel {} from origScan {}",
finalRel.toString(), origScan.toString());
return finalRel;
}
private Pair<RelNode, DbGroupScan> buildRestrictedDBScan(RexNode remnant,
boolean isAnyIndexAsync) {
DbGroupScan origDbGroupScan = (DbGroupScan)IndexPlanUtils.getGroupScan(origScan);
List<SchemaPath> cols = new ArrayList<SchemaPath>(origDbGroupScan.getColumns());
if (!checkRowKey(cols)) {
cols.add(origDbGroupScan.getRowKeyPath());
}
// Create a restricted groupscan from the primary table's groupscan
DbGroupScan restrictedGroupScan = origDbGroupScan.getRestrictedScan(cols);
if (restrictedGroupScan == null) {
logger.error("Null restricted groupscan in IndexIntersectPlanGenerator.convertChild");
return null;
}
DrillDistributionTrait partition = IndexPlanUtils.scanIsPartition(IndexPlanUtils.getGroupScan(origScan))?
DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON;
RelNode lastRelNode;
RelDataType dbscanRowType = convertRowType(origScan.getRowType(), origScan.getCluster().getTypeFactory());
ScanPrel dbScan = new ScanPrel(origScan.getCluster(),
origScan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition), restrictedGroupScan, dbscanRowType, origScan.getTable());
lastRelNode = dbScan;
// build the row type for the left Project
List<RexNode> leftProjectExprs = Lists.newArrayList();
int leftRowKeyIndex = getRowKeyIndex(dbScan.getRowType(), origScan);
final RelDataTypeField leftRowKeyField = dbScan.getRowType().getFieldList().get(leftRowKeyIndex);
final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder =
dbScan.getCluster().getTypeFactory().builder();
FilterPrel leftIndexFilterPrel = null;
// See NonCoveringIndexPlanGenerator for why we are re-applying index filter condition in case of async indexes.
// For intersect planning, any one of the intersected indexes may be async but to keep it simple we re-apply the
// full original condition.
if (isAnyIndexAsync) {
new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
dbScan, indexContext.getOrigCondition());
lastRelNode = leftIndexFilterPrel;
}
// new Project's rowtype is original Project's rowtype [plus rowkey if rowkey is not in original rowtype]
ProjectPrel leftIndexProjectPrel = null;
if (origProject != null) {
RelDataType origRowType = origProject.getRowType();
List<RelDataTypeField> origProjFields = origRowType.getFieldList();
leftFieldTypeBuilder.addAll(origProjFields);
// get the exprs from the original Project IFF there is a project
leftProjectExprs.addAll(IndexPlanUtils.getProjects(origProject));
// add the rowkey IFF rowkey is not in orig scan
if (getRowKeyIndex(origRowType, origScan) < 0) {
leftFieldTypeBuilder.add(leftRowKeyField);
leftProjectExprs.add(RexInputRef.of(leftRowKeyIndex, dbScan.getRowType()));
}
final RelDataType leftProjectRowType = leftFieldTypeBuilder.build();
leftIndexProjectPrel = new ProjectPrel(dbScan.getCluster(), dbScan.getTraitSet(),
leftIndexFilterPrel == null ? dbScan : leftIndexFilterPrel, leftProjectExprs, leftProjectRowType);
lastRelNode = leftIndexProjectPrel;
}
final RelTraitSet leftTraits = dbScan.getTraitSet().plus(Prel.DRILL_PHYSICAL);
// final RelNode convertedLeft = convert(leftIndexProjectPrel, leftTraits);
final RelNode convertedLeft = Prule.convert(lastRelNode, leftTraits);
return new Pair<>(convertedLeft, restrictedGroupScan);
}
@Override
public RelNode convertChild(final RelNode filter, final RelNode input) throws InvalidRelException {
Map<IndexDescriptor, RexNode> idxConditionMap = Maps.newLinkedHashMap();
boolean isAnyIndexAsync = false;
for (IndexDescriptor idx : indexInfoMap.keySet()) {
idxConditionMap.put(idx, indexInfoMap.get(idx).indexCondition);
if (!isAnyIndexAsync && idx.isAsyncIndex()) {
isAnyIndexAsync = true;
}
}
RelNode indexPlan = null;
boolean generateDistribution;
int curIdx = 0;
RexNode remnant = indexContext.getFilterCondition();
for (Map.Entry<IndexDescriptor, RexNode> pair : idxConditionMap.entrySet()) {
// For the last index, the generated join is distributed using createRangeDistRight instead!
generateDistribution = (idxConditionMap.entrySet().size()-1-curIdx) > 0;
indexPlan = buildIntersectPlan(pair, indexPlan, generateDistribution);
remnant = indexInfoMap.get(pair.getKey()).remainderCondition;
++curIdx;
}
final RelDataTypeField rightRowKeyField = indexPlan.getRowType().getFieldList().get(0);
final RelNode rangeDistRight = createRangeDistRight(indexPlan, rightRowKeyField,
(DbGroupScan)IndexPlanUtils.getGroupScan(origScan));
// now with index plan constructed, build plan of left(probe) side to use restricted db scan
Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant, isAnyIndexAsync);
RelNode finalRel = buildRowKeyJoin(leftRelAndScan.left, rangeDistRight, true, JoinControl.DEFAULT);
if (upperProject != null) {
ProjectPrel cap = new ProjectPrel(finalRel.getCluster(), finalRel.getTraitSet(),
finalRel, IndexPlanUtils.getProjects(upperProject), upperProject.getRowType());
finalRel = cap;
}
return finalRel;
}
}