blob: d41a1473b42a5496c6ab273d4f03ff431f7d6460 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.physical;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.util.BitSets;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillWindowRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import java.util.List;
public class WindowPrule extends Prule {
public static final RelOptRule INSTANCE = new WindowPrule();
private WindowPrule() {
super(RelOptHelper.some(DrillWindowRel.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(RelNode.class)), "Prel.WindowPrule");
}
@Override
public void onMatch(RelOptRuleCall call) {
final DrillWindowRel window = call.rel(0);
RelNode input = call.rel(1);
// TODO: Order window based on existing partition by
//input.getTraitSet().subsumes()
boolean partitionby = false;
boolean addMerge = false;
// The start index of the constant fields of DrillWindowRel
final int startConstantsIndex = window.getInput().getRowType().getFieldCount();
int constantShiftIndex = 0;
for (final Ord<Window.Group> w : Ord.zip(window.groups)) {
Window.Group windowBase = w.getValue();
RelTraitSet traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
// For empty Over-Clause
if(windowBase.keys.isEmpty()
&& windowBase.orderKeys.getFieldCollations().isEmpty()) {
DrillDistributionTrait distEmptyKeys =
new DrillDistributionTrait(DrillDistributionTrait.DistributionType.SINGLETON);
traits = traits.plus(distEmptyKeys);
} else if (windowBase.keys.size() > 0) {
DrillDistributionTrait distOnAllKeys =
new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionFields(windowBase)));
partitionby = true;
traits = traits.plus(distOnAllKeys);
} else if (windowBase.orderKeys.getFieldCollations().size() > 0) {
// if only the order-by clause is specified, there is a single partition
// consisting of all the rows, so we do a distributed sort followed by a
// single merge as the input of the window operator
DrillDistributionTrait distKeys =
new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionFieldsFromCollation(windowBase)));
traits = traits.plus(distKeys);
if(!isSingleMode(call)) {
addMerge = true;
}
}
// Add collation trait if either partition-by or order-by is specified.
if (partitionby || windowBase.orderKeys.getFieldCollations().size() > 0) {
RelCollation collation = getCollation(windowBase);
traits = traits.plus(collation);
}
RelNode convertedInput = convert(input, traits);
if (addMerge) {
traits = traits.plus(DrillDistributionTrait.SINGLETON);
convertedInput = new SingleMergeExchangePrel(window.getCluster(), traits,
convertedInput, windowBase.collation());
}
List<RelDataTypeField> newRowFields = Lists.newArrayList();
newRowFields.addAll(convertedInput.getRowType().getFieldList());
Iterable<RelDataTypeField> newWindowFields = Iterables.filter(window.getRowType().getFieldList(), new Predicate<RelDataTypeField>() {
@Override
public boolean apply(RelDataTypeField relDataTypeField) {
return relDataTypeField.getName().startsWith("w" + w.i + "$");
}
});
for(RelDataTypeField newField : newWindowFields) {
newRowFields.add(newField);
}
RelDataType rowType = new RelRecordType(newRowFields);
List<Window.RexWinAggCall> newWinAggCalls = Lists.newArrayList();
for(Ord<Window.RexWinAggCall> aggOrd : Ord.zip(windowBase.aggCalls)) {
Window.RexWinAggCall aggCall = aggOrd.getValue();
// If the argument points at the constant and
// additional fields have been generated by the Window below,
// the index of constants will be shifted
final List<RexNode> newOperandsOfWindowFunction = Lists.newArrayList();
for(RexNode operand : aggCall.getOperands()) {
if(operand instanceof RexInputRef) {
final RexInputRef rexInputRef = (RexInputRef) operand;
final int refIndex = rexInputRef.getIndex();
// Check if this RexInputRef points at the constants
if(rexInputRef.getIndex() >= startConstantsIndex) {
operand = new RexInputRef(refIndex + constantShiftIndex,
window.constants.get(refIndex - startConstantsIndex).getType());
}
}
newOperandsOfWindowFunction.add(operand);
}
aggCall = new Window.RexWinAggCall(
(SqlAggFunction) aggCall.getOperator(),
aggCall.getType(),
newOperandsOfWindowFunction,
aggCall.ordinal,
aggCall.distinct);
newWinAggCalls.add(new Window.RexWinAggCall(
(SqlAggFunction)aggCall.getOperator(), aggCall.getType(), aggCall.getOperands(), aggOrd.i, aggCall.distinct)
);
}
windowBase = new Window.Group(
windowBase.keys,
windowBase.isRows,
windowBase.lowerBound,
windowBase.upperBound,
windowBase.orderKeys,
newWinAggCalls
);
input = new WindowPrel(
window.getCluster(),
window.getTraitSet().merge(traits),
convertedInput,
window.getConstants(),
rowType,
windowBase);
constantShiftIndex += windowBase.aggCalls.size();
}
call.transformTo(input);
}
/**
* Create a RelCollation that has partition-by as the leading keys followed by order-by keys
* @param window The window specification
* @return a RelCollation with {partition-by keys, order-by keys}
*/
private RelCollation getCollation(Window.Group window) {
List<RelFieldCollation> fields = Lists.newArrayList();
for (int group : BitSets.toIter(window.keys)) {
fields.add(new RelFieldCollation(group));
}
fields.addAll(window.orderKeys.getFieldCollations());
return RelCollations.of(fields);
}
private List<DrillDistributionTrait.DistributionField> getDistributionFields(Window.Group window) {
List<DrillDistributionTrait.DistributionField> groupByFields = Lists.newArrayList();
for (int group : BitSets.toIter(window.keys)) {
DrillDistributionTrait.DistributionField field = new DrillDistributionTrait.DistributionField(group);
groupByFields.add(field);
}
return groupByFields;
}
private List<DistributionField> getDistributionFieldsFromCollation(Window.Group window) {
List<DistributionField> distFields = Lists.newArrayList();
for (RelFieldCollation relField : window.collation().getFieldCollations()) {
DistributionField field = new DistributionField(relField.getFieldIndex());
distFields.add(field);
}
return distFields;
}
}