blob: b636e32d94998e63427934937590af8c14478abd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.physical;
import java.util.List;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.exec.physical.base.DbGroupScan;
import org.apache.drill.exec.physical.impl.join.JoinUtils;
import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
import org.apache.drill.exec.planner.common.DrillJoinRelBase;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
import org.apache.drill.exec.planner.common.JoinControl;
import org.apache.drill.exec.planner.logical.DrillJoin;
import org.apache.drill.exec.planner.logical.DrillPushRowKeyJoinToScanRule;
import org.apache.drill.exec.planner.logical.RowKeyJoinRel;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rex.RexNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
// abstract base class for the join physical rules
public abstract class JoinPruleBase extends Prule {
protected enum PhysicalJoinType {HASH_JOIN, MERGE_JOIN, NESTEDLOOP_JOIN}
protected JoinPruleBase(RelOptRuleOperand operand, String description) {
super(operand, description);
}
protected boolean checkPreconditions(DrillJoin join, RelNode left, RelNode right,
PlannerSettings settings) {
List<Integer> leftKeys = Lists.newArrayList();
List<Integer> rightKeys = Lists.newArrayList();
List<Boolean> filterNulls = Lists.newArrayList();
JoinCategory category = JoinUtils.getJoinCategory(left, right, join.getCondition(), leftKeys, rightKeys, filterNulls);
return !(category == JoinCategory.CARTESIAN || category == JoinCategory.INEQUALITY);
}
protected List<DistributionField> getDistributionField(List<Integer> keys) {
List<DistributionField> distFields = Lists.newArrayList();
for (int key : keys) {
distFields.add(new DistributionField(key));
}
return distFields;
}
protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoin join, RelNode left, RelNode right) {
double estimatedRightRowCount = RelMetadataQuery.instance().getRowCount(right);
if (estimatedRightRowCount < PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold()
&& ! left.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.SINGLETON)
&& (join.getJoinType() == JoinRelType.INNER || join.getJoinType() == JoinRelType.LEFT)
) {
return true;
}
return false;
}
protected void createRangePartitionRightPlan(RelOptRuleCall call, RowKeyJoinRel join,
PhysicalJoinType physicalJoinType, boolean implementAsRowKeyJoin, RelNode left, RelNode right,
RelCollation collationLeft, RelCollation collationRight) throws InvalidRelException {
assert join.getRightKeys().size() == 1 : "Cannot create range partition plan with multi-column join condition";
int joinKeyRight = join.getRightKeys().get(0);
List<DrillDistributionTrait.DistributionField> rangeDistFields =
Lists.newArrayList(new DrillDistributionTrait.DistributionField(joinKeyRight /* `rowkey equivalent` ordinal on the right side */));
List<FieldReference> rangeDistRefList = Lists.newArrayList();
FieldReference rangeDistRef =
FieldReference.getWithQuotedRef(right.getRowType().getFieldList().get(joinKeyRight).getName());
rangeDistRefList.add(rangeDistRef);
RelNode leftScan = DrillPushRowKeyJoinToScanRule.getValidJoinInput(left);
DrillDistributionTrait rangePartRight = new DrillDistributionTrait(
DrillDistributionTrait.DistributionType.RANGE_DISTRIBUTED,
ImmutableList.copyOf(rangeDistFields),
((DbGroupScan)((DrillScanRelBase) leftScan).getGroupScan()).getRangePartitionFunction(rangeDistRefList));
RelTraitSet traitsLeft = null;
RelTraitSet traitsRight = null;
if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(rangePartRight);
}
final RelNode convertedLeft = convert(left, traitsLeft);
final RelNode convertedRight = convert(right, traitsRight);
DrillJoinRelBase newJoin = null;
if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
if (implementAsRowKeyJoin) {
newJoin = new RowKeyJoinPrel(join.getCluster(), traitsLeft,
convertedLeft, convertedRight, join.getCondition(),
join.getJoinType(), join.isSemiJoin());
} else {
newJoin = new HashJoinPrel(join.getCluster(), traitsLeft,
convertedLeft, convertedRight, join.getCondition(),
join.getJoinType(), false /* no swap */,
null /* no runtime filter */,
true /* useful for join-restricted scans */,
JoinControl.DEFAULT, join.isSemiJoin());
}
}
if (newJoin != null) {
call.transformTo(newJoin);
}
}
protected void createDistBothPlan(RelOptRuleCall call, DrillJoin join,
PhysicalJoinType physicalJoinType,
RelNode left, RelNode right,
RelCollation collationLeft, RelCollation collationRight,
boolean hashSingleKey)throws InvalidRelException {
/* If join keys are l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following options of plan:
* 1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, r2, ..., r_k) for right side.
* 2) Plan2: distributed by l1 for left side, by r1 for right side.
* 3) Plan3: distributed by l2 for left side, by r2 for right side.
* ...
* Plan_(k+1): distributed by l_k for left side, by r_k by right side.
*
* Whether enumerate plan 2, .., Plan_(k+1) depends on option : hashSingleKey.
*/
DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition);
assert (join.getLeftKeys().size() == join.getRightKeys().size());
if (!hashSingleKey) {
return;
}
int numJoinKeys = join.getLeftKeys().size();
if (numJoinKeys > 1) {
for (int i = 0; i< numJoinKeys; i++) {
hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, i+1))));
hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, i+1))));
createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition);
}
}
}
// Create join plan with both left and right children hash distributed. If the physical join type
// is MergeJoin, a collation must be provided for both left and right child and the plan will contain
// sort converter if necessary to provide the collation.
private void createDistBothPlan(RelOptRuleCall call, DrillJoin join,
PhysicalJoinType physicalJoinType,
RelNode left, RelNode right,
RelCollation collationLeft, RelCollation collationRight,
DrillDistributionTrait hashLeftPartition, DrillDistributionTrait hashRightPartition) throws InvalidRelException {
RelTraitSet traitsLeft = null;
RelTraitSet traitsRight = null;
if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
assert collationLeft != null && collationRight != null;
traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(hashLeftPartition);
traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(hashRightPartition);
} else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashLeftPartition);
traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashRightPartition);
}
final RelNode convertedLeft = convert(left, traitsLeft);
final RelNode convertedRight = convert(right, traitsRight);
DrillJoinRelBase newJoin = null;
if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
final RelTraitSet traitSet = PrelUtil.removeCollation(traitsLeft, call);
newJoin = new HashJoinPrel(join.getCluster(), traitSet,
convertedLeft, convertedRight, join.getCondition(),
join.getJoinType(), join.isSemiJoin());
} else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft,
convertedLeft, convertedRight, join.getCondition(),
join.getJoinType(), join.isSemiJoin());
}
call.transformTo(newJoin);
}
// Create join plan with left child ANY distributed and right child BROADCAST distributed. If the physical join type
// is MergeJoin, a collation must be provided for both left and right child and the plan will contain sort converter
// if necessary to provide the collation.
protected void createBroadcastPlan(final RelOptRuleCall call, final DrillJoin join,
final RexNode joinCondition,
final PhysicalJoinType physicalJoinType,
final RelNode left, final RelNode right,
final RelCollation collationLeft, final RelCollation collationRight) throws InvalidRelException {
DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
RelTraitSet traitsRight = null;
RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
assert collationLeft != null && collationRight != null;
traitsLeft = traitsLeft.plus(collationLeft);
traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationRight).plus(distBroadcastRight);
} else if (physicalJoinType == PhysicalJoinType.HASH_JOIN ||
physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight);
}
final RelNode convertedLeft = convert(left, traitsLeft);
final RelNode convertedRight = convert(right, traitsRight);
boolean traitProp = false;
if(traitProp){
if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
@Override
public RelNode convertChild(final DrillJoin join, final RelNode rel) throws InvalidRelException {
DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, toDist);
RelNode newLeft = convert(left, newTraitsLeft);
return new MergeJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, joinCondition,
join.getJoinType());
}
}.go(join, convertedLeft);
} else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
@Override
public RelNode convertChild(final DrillJoin join, final RelNode rel) throws InvalidRelException {
DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
RelNode newLeft = convert(left, newTraitsLeft);
return new HashJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, joinCondition,
join.getJoinType(), join.isSemiJoin());
}
}.go(join, convertedLeft);
} else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
@Override
public RelNode convertChild(final DrillJoin join, final RelNode rel) throws InvalidRelException {
DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
RelNode newLeft = convert(left, newTraitsLeft);
return new NestedLoopJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, joinCondition,
join.getJoinType());
}
}.go(join, convertedLeft);
}
} else {
if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
call.transformTo(new MergeJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft,
convertedRight, joinCondition, join.getJoinType(), join.isSemiJoin()));
} else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
final RelTraitSet traitSet = PrelUtil.removeCollation(convertedLeft.getTraitSet(), call);
call.transformTo(new HashJoinPrel(join.getCluster(), traitSet, convertedLeft,
convertedRight, joinCondition, join.getJoinType(), join.isSemiJoin()));
} else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
call.transformTo(new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft,
convertedRight, joinCondition, join.getJoinType()));
}
}
}
}