blob: 275dd48697a0e9299b5d0fe3e19cbc8aafcef0d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.physical;
import com.google.common.collect.Lists;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.WindowPOP;
import org.apache.drill.exec.planner.common.DrillWindowRelBase;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.util.BitSets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import static com.google.common.base.Preconditions.checkState;
public class WindowPrel extends DrillWindowRelBase implements Prel {
public WindowPrel(RelOptCluster cluster,
RelTraitSet traits,
RelNode child,
List<RexLiteral> constants,
RelDataType rowType,
Group window) {
super(cluster, traits, child, constants, rowType, Collections.singletonList(window));
}
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
final RelDataType copiedRowType = deriveCopiedRowTypeFromInput(sole(inputs));
return new WindowPrel(getCluster(), traitSet, sole(inputs), constants, copiedRowType, groups.get(0));
}
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
Prel child = (Prel) this.getInput();
PhysicalOperator childPOP = child.getPhysicalOperator(creator);
final List<String> childFields = getInput().getRowType().getFieldNames();
// We don't support distinct partitions
checkState(groups.size() == 1, "Only one window is expected in WindowPrel");
Group window = groups.get(0);
List<NamedExpression> withins = Lists.newArrayList();
List<NamedExpression> aggs = Lists.newArrayList();
List<Order.Ordering> orderings = Lists.newArrayList();
for (int group : BitSets.toIter(window.keys)) {
FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
withins.add(new NamedExpression(fr, fr));
}
for (AggregateCall aggCall : window.getAggregateCalls(this)) {
FieldReference ref = new FieldReference(aggCall.getName());
LogicalExpression expr = toDrill(aggCall, childFields);
aggs.add(new NamedExpression(expr, ref));
}
for (RelFieldCollation fieldCollation : window.orderKeys.getFieldCollations()) {
orderings.add(new Order.Ordering(fieldCollation.getDirection(), new FieldReference(childFields.get(fieldCollation.getFieldIndex())), fieldCollation.nullDirection));
}
WindowPOP windowPOP = new WindowPOP(
childPOP,
withins,
aggs,
orderings,
window.isRows,
WindowPOP.newBound(window.lowerBound),
WindowPOP.newBound(window.upperBound));
creator.addMetadata(this, windowPOP);
return windowPOP;
}
protected LogicalExpression toDrill(AggregateCall call, List<String> fn) {
DrillParseContext context = new DrillParseContext(PrelUtil.getSettings(getCluster()));
List<LogicalExpression> args = Lists.newArrayList();
for (Integer i : call.getArgList()) {
final int indexInConstants = i - fn.size();
if (i < fn.size()) {
args.add(new FieldReference(fn.get(i)));
} else {
final RexLiteral constant = constants.get(indexInConstants);
LogicalExpression expr = DrillOptiq.toDrill(context, getInput(), constant);
args.add(expr);
}
}
// for count(1).
if (args.isEmpty()) {
args.add(new ValueExpressions.LongExpression(1l));
}
return new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN);
}
@Override
public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
return logicalVisitor.visitPrel(this, value);
}
@Override
public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
return BatchSchema.SelectionVectorMode.DEFAULT;
}
@Override
public BatchSchema.SelectionVectorMode getEncoding() {
return BatchSchema.SelectionVectorMode.NONE;
}
@Override
public boolean needsFinalColumnReordering() {
return false;
}
@Override
public Iterator<Prel> iterator() {
return PrelUtil.iter(getInput());
}
/**
* Derive rowType for the copied WindowPrel based on input.
* When copy() is called, the input might be different from the current one's input.
* We have to use the new input's field in the copied WindowPrel.
*/
private RelDataType deriveCopiedRowTypeFromInput(final RelNode input) {
final RelDataType inputRowType = input.getRowType();
final RelDataType windowRowType = this.getRowType();
final List<RelDataTypeField> fieldList = new ArrayList<>(inputRowType.getFieldList());
final int inputFieldCount = inputRowType.getFieldCount();
final int windowFieldCount = windowRowType.getFieldCount();
for (int i = inputFieldCount; i < windowFieldCount; i++) {
fieldList.add(windowRowType.getFieldList().get(i));
}
final RelDataType rowType = this.getCluster().getRexBuilder().getTypeFactory().createStructType(fieldList);
return rowType;
}
}