blob: a6d9719fa2319b6d4c5497b3c151b004cf8ba889 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.physical;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.rel.RelCollations;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionType;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
public class ProjectPrule extends Prule {
public static final RelOptRule INSTANCE = new ProjectPrule();
private ProjectPrule() {
super(RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(RelNode.class)), "ProjectPrule");
}
@Override
public void onMatch(RelOptRuleCall call) {
final DrillProjectRel project = call.rel(0);
final RelNode input = project.getInput();
RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
RelNode convertedInput = convert(input, traits);
// Maintain two different map for distribution trait and collation trait.
// For now, the only difference comes from the way how cast function impacts propagating trait.
final Map<Integer, Integer> distributionMap = getDistributionMap(project);
final Map<Integer, Integer> collationMap = getCollationMap(project);
boolean traitPull = new ProjectTraitPull(call, distributionMap, collationMap).go(project, convertedInput);
if(!traitPull){
call.transformTo(new ProjectPrel(project.getCluster(), convertedInput.getTraitSet(), convertedInput, project.getProjects(), project.getRowType()));
}
}
private class ProjectTraitPull extends SubsetTransformer<DrillProjectRel, RuntimeException> {
final Map<Integer, Integer> distributionMap;
final Map<Integer, Integer> collationMap;
public ProjectTraitPull(RelOptRuleCall call, Map<Integer, Integer> distributionMap, Map<Integer, Integer> collationMap) {
super(call);
this.distributionMap = distributionMap;
this.collationMap = collationMap;
}
@Override
public RelNode convertChild(DrillProjectRel project, RelNode rel) throws RuntimeException {
DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
DrillDistributionTrait newDist = convertDist(childDist, distributionMap);
RelCollation newCollation = convertRelCollation(childCollation, collationMap);
RelTraitSet newProjectTraits = newTraitSet(Prel.DRILL_PHYSICAL, newDist, newCollation);
return new ProjectPrel(project.getCluster(), newProjectTraits, rel, project.getProjects(), project.getRowType());
}
}
private DrillDistributionTrait convertDist(DrillDistributionTrait srcDist, Map<Integer, Integer> inToOut) {
List<DistributionField> newFields = Lists.newArrayList();
for (DistributionField field : srcDist.getFields()) {
if (inToOut.containsKey(field.getFieldId())) {
newFields.add(new DistributionField(inToOut.get(field.getFieldId())));
}
}
// After the projection, if the new distribution fields is empty, or new distribution fields is a subset of
// original distribution field, we should replace with either SINGLETON or RANDOM_DISTRIBUTED.
if (newFields.isEmpty() || newFields.size() < srcDist.getFields().size()) {
if (srcDist.getType() != DistributionType.SINGLETON) {
return DrillDistributionTrait.RANDOM_DISTRIBUTED;
} else {
return DrillDistributionTrait.SINGLETON;
}
} else {
return new DrillDistributionTrait(srcDist.getType(), ImmutableList.copyOf(newFields));
}
}
private RelCollation convertRelCollation(RelCollation src, Map<Integer, Integer> inToOut) {
List<RelFieldCollation> newFields = Lists.newArrayList();
for ( RelFieldCollation field : src.getFieldCollations()) {
if (inToOut.containsKey(field.getFieldIndex())) {
newFields.add(new RelFieldCollation(inToOut.get(field.getFieldIndex()), field.getDirection(), field.nullDirection));
}
}
if (newFields.isEmpty()) {
return RelCollations.of();
} else {
return RelCollations.of(newFields);
}
}
private Map<Integer, Integer> getDistributionMap(DrillProjectRel project) {
Map<Integer, Integer> m = new HashMap<>();
for (Ord<RexNode> node : Ord.zip(project.getProjects())) {
// For distribution, either $0 or cast($0 as ...) would keep the distribution after projection.
if (node.e instanceof RexInputRef) {
m.put( ((RexInputRef) node.e).getIndex(), node.i);
} else if (node.e.isA(SqlKind.CAST)) {
RexNode operand = ((RexCall) node.e).getOperands().get(0);
if (operand instanceof RexInputRef) {
m.put(((RexInputRef) operand).getIndex(), node.i);
}
}
}
return m;
}
private Map<Integer, Integer> getCollationMap(DrillProjectRel project) {
Map<Integer, Integer> m = new HashMap<>();
for (Ord<RexNode> node : Ord.zip(project.getProjects())) {
// For collation, only $0 will keep the sort-ness after projection.
if (node.e instanceof RexInputRef) {
m.put( ((RexInputRef) node.e).getIndex(), node.i);
}
}
return m;
}
}