blob: 303a57fce664d30a72f1816ccfc87ed76b7e0820 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.rel.rules;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Calc;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.logical.LogicalCalc;
import org.apache.calcite.rel.logical.LogicalWindow;
import org.apache.calcite.rex.RexBiVisitorImpl;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexDynamicParam;
import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexLocalRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexOver;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexWindow;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.apache.calcite.util.graph.DefaultDirectedGraph;
import org.apache.calcite.util.graph.DefaultEdge;
import org.apache.calcite.util.graph.DirectedGraph;
import org.apache.calcite.util.graph.TopologicalOrderIterator;
import com.google.common.collect.ImmutableList;
import org.immutables.value.Value;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Planner rule that slices a
* {@link org.apache.calcite.rel.core.Project}
* into sections which contain windowed
* aggregate functions and sections which do not.
*
* <p>The sections which contain windowed agg functions become instances of
* {@link org.apache.calcite.rel.logical.LogicalWindow}.
* If the {@link org.apache.calcite.rel.logical.LogicalCalc} does not contain
* any windowed agg functions, does nothing.
*
* <p>There is also a variant that matches
* {@link org.apache.calcite.rel.core.Calc} rather than {@code Project}.
*/
public abstract class ProjectToWindowRule
extends RelRule<ProjectToWindowRule.Config>
implements TransformationRule {
/** Creates a ProjectToWindowRule. */
protected ProjectToWindowRule(Config config) {
super(config);
}
/**
* Instance of the rule that applies to a
* {@link org.apache.calcite.rel.core.Calc} that contains
* windowed aggregates and converts it into a mixture of
* {@link org.apache.calcite.rel.logical.LogicalWindow} and {@code Calc}.
*
* @see CoreRules#CALC_TO_WINDOW
*/
public static class CalcToWindowRule extends ProjectToWindowRule {
/** Creates a CalcToWindowRule. */
protected CalcToWindowRule(CalcToWindowRuleConfig config) {
super(config);
}
@Override public void onMatch(RelOptRuleCall call) {
final Calc calc = call.rel(0);
assert calc.containsOver();
final CalcRelSplitter transform =
new WindowedAggRelSplitter(calc, call.builder());
RelNode newRel = transform.execute();
call.transformTo(newRel);
}
/** Rule configuration. */
@Value.Immutable
public interface CalcToWindowRuleConfig extends ProjectToWindowRule.Config {
CalcToWindowRuleConfig DEFAULT = ImmutableCalcToWindowRuleConfig.of()
.withOperandSupplier(b ->
b.operand(Calc.class)
.predicate(Calc::containsOver)
.anyInputs())
.withDescription("ProjectToWindowRule");
@Override default CalcToWindowRule toRule() {
return new CalcToWindowRule(this);
}
}
}
/**
* Instance of the rule that can be applied to a
* {@link org.apache.calcite.rel.core.Project} and that produces, in turn,
* a mixture of {@code LogicalProject}
* and {@link org.apache.calcite.rel.logical.LogicalWindow}.
*
* @see CoreRules#PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW
*/
public static class ProjectToLogicalProjectAndWindowRule
extends ProjectToWindowRule {
/** Creates a ProjectToLogicalProjectAndWindowRule. */
protected ProjectToLogicalProjectAndWindowRule(
ProjectToLogicalProjectAndWindowRuleConfig config) {
super(config);
}
@Deprecated // to be removed before 2.0
public ProjectToLogicalProjectAndWindowRule(
RelBuilderFactory relBuilderFactory) {
this(ProjectToLogicalProjectAndWindowRuleConfig.DEFAULT
.withRelBuilderFactory(relBuilderFactory)
.as(ProjectToLogicalProjectAndWindowRuleConfig.class));
}
@Override public void onMatch(RelOptRuleCall call) {
Project project = call.rel(0);
assert project.containsOver();
final RelNode input = project.getInput();
final RexProgram program =
RexProgram.create(
input.getRowType(),
project.getProjects(),
null,
project.getRowType(),
project.getCluster().getRexBuilder());
// temporary LogicalCalc, never registered
final LogicalCalc calc = LogicalCalc.create(input, program);
final CalcRelSplitter transform =
new WindowedAggRelSplitter(calc, call.builder()) {
@Override protected RelNode handle(RelNode rel) {
if (!(rel instanceof LogicalCalc)) {
return rel;
}
final LogicalCalc calc = (LogicalCalc) rel;
final RexProgram program = calc.getProgram();
relBuilder.push(calc.getInput());
if (program.getCondition() != null) {
relBuilder.filter(
program.expandLocalRef(program.getCondition()));
}
if (!program.projectsOnlyIdentity()) {
relBuilder.project(
Util.transform(program.getProjectList(),
program::expandLocalRef),
calc.getRowType().getFieldNames());
}
return relBuilder.build();
}
};
RelNode newRel = transform.execute();
call.transformTo(newRel);
}
/** Rule configuration. */
@Value.Immutable
public interface ProjectToLogicalProjectAndWindowRuleConfig
extends ProjectToWindowRule.Config {
ProjectToLogicalProjectAndWindowRuleConfig DEFAULT =
ImmutableProjectToLogicalProjectAndWindowRuleConfig.of()
.withOperandSupplier(b ->
b.operand(Project.class)
.predicate(Project::containsOver)
.anyInputs())
.withDescription("ProjectToWindowRule:project");
@Override default ProjectToLogicalProjectAndWindowRule toRule() {
return new ProjectToLogicalProjectAndWindowRule(this);
}
}
}
/**
* Splitter that distinguishes between windowed aggregation expressions
* (calls to {@link RexOver}) and ordinary expressions.
*/
static class WindowedAggRelSplitter extends CalcRelSplitter {
private static final RelType[] REL_TYPES = {
new RelType("CalcRelType") {
@Override protected boolean canImplement(RexFieldAccess field) {
return true;
}
@Override protected boolean canImplement(RexDynamicParam param) {
return true;
}
@Override protected boolean canImplement(RexLiteral literal) {
return true;
}
@Override protected boolean canImplement(RexCall call) {
return !(call instanceof RexOver);
}
@Override protected RelNode makeRel(RelOptCluster cluster,
RelTraitSet traitSet, RelBuilder relBuilder, RelNode input,
RexProgram program) {
assert !program.containsAggs();
program = program.normalize(cluster.getRexBuilder(), null);
return super.makeRel(cluster, traitSet, relBuilder, input,
program);
}
},
new RelType("WinAggRelType") {
@Override protected boolean canImplement(RexFieldAccess field) {
return false;
}
@Override protected boolean canImplement(RexDynamicParam param) {
return false;
}
@Override protected boolean canImplement(RexLiteral literal) {
return false;
}
@Override protected boolean canImplement(RexCall call) {
return call instanceof RexOver;
}
@Override protected boolean supportsCondition() {
return false;
}
@Override protected RelNode makeRel(RelOptCluster cluster, RelTraitSet traitSet,
RelBuilder relBuilder, RelNode input, RexProgram program) {
checkArgument(program.getCondition() == null,
"WindowedAggregateRel cannot accept a condition");
return LogicalWindow.create(cluster, traitSet, relBuilder, input,
program);
}
}
};
WindowedAggRelSplitter(Calc calc, RelBuilder relBuilder) {
super(calc, relBuilder, REL_TYPES);
}
@Override protected List<Set<Integer>> getCohorts() {
// Two RexOver will be put in the same cohort
// if the following conditions are satisfied
// (1). They have the same RexWindow
// (2). They are not dependent on each other
final List<RexNode> exprs = this.program.getExprList();
final DirectedGraph<Integer, DefaultEdge> graph =
createGraphFromExpression(exprs);
final List<Integer> rank = getRank(graph);
final List<Pair<RexWindow, Set<Integer>>> windowToIndices = new ArrayList<>();
for (int i = 0; i < exprs.size(); ++i) {
final RexNode expr = exprs.get(i);
if (expr instanceof RexOver) {
final RexOver over = (RexOver) expr;
// If we can find an existing cohort which satisfies the two conditions,
// we will add this RexOver into that cohort
boolean isFound = false;
for (Pair<RexWindow, Set<Integer>> pair : windowToIndices) {
// Check the first condition
if (pair.left.equals(over.getWindow())) {
// Check the second condition
boolean hasDependency = false;
for (int ordinal : pair.right) {
if (isDependent(graph, rank, ordinal, i)) {
hasDependency = true;
break;
}
}
if (!hasDependency) {
pair.right.add(i);
isFound = true;
break;
}
}
}
// This RexOver cannot be added into any existing cohort
if (!isFound) {
final Set<Integer> newSet = new HashSet<>(ImmutableList.of(i));
windowToIndices.add(Pair.of(over.getWindow(), newSet));
}
}
}
final List<Set<Integer>> cohorts = new ArrayList<>();
for (Pair<RexWindow, Set<Integer>> pair : windowToIndices) {
cohorts.add(pair.right);
}
return cohorts;
}
private static boolean isDependent(final DirectedGraph<Integer, DefaultEdge> graph,
final List<Integer> rank,
final int ordinal1,
final int ordinal2) {
if (rank.get(ordinal2) > rank.get(ordinal1)) {
return isDependent(graph, rank, ordinal2, ordinal1);
}
// Check if the expression in ordinal1
// could depend on expression in ordinal2 by Depth-First-Search
final Deque<Integer> dfs = new ArrayDeque<>();
final Set<Integer> visited = new HashSet<>();
dfs.push(ordinal2);
while (!dfs.isEmpty()) {
int source = dfs.pop();
if (visited.contains(source)) {
continue;
}
if (source == ordinal1) {
return true;
}
visited.add(source);
for (DefaultEdge e : graph.getOutwardEdges(source)) {
int target = (int) e.target;
if (rank.get(target) <= rank.get(ordinal1)) {
dfs.push(target);
}
}
}
return false;
}
private static List<Integer> getRank(DirectedGraph<Integer, DefaultEdge> graph) {
final int[] rankArr = new int[graph.vertexSet().size()];
int rank = 0;
for (int i : TopologicalOrderIterator.of(graph)) {
rankArr[i] = rank++;
}
return ImmutableIntList.of(rankArr);
}
private static DirectedGraph<Integer, DefaultEdge> createGraphFromExpression(
final List<RexNode> exprs) {
final DirectedGraph<Integer, DefaultEdge> graph =
DefaultDirectedGraph.create();
for (int i = 0; i < exprs.size(); i++) {
graph.addVertex(i);
}
new RexBiVisitorImpl<Void, Integer>(true) {
@Override public Void visitLocalRef(RexLocalRef localRef, Integer i) {
graph.addEdge(localRef.getIndex(), i);
return null;
}
}.visitEachIndexed(exprs);
assert graph.vertexSet().size() == exprs.size();
return graph;
}
}
/** Rule configuration. */
public interface Config extends RelRule.Config {
@Override ProjectToWindowRule toRule();
}
}