blob: 579efc3336e0be61b838bef199b06f15d8d0dfef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.rel.rules.materialize;
import org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.rules.AggregateProjectPullUpConstantsRule;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexPermuteInputsShuttle;
import org.apache.calcite.rex.RexSimplify;
import org.apache.calcite.rex.RexTableInputRef;
import org.apache.calcite.rex.RexTableInputRef.RelTableRef;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilder.AggCall;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.mapping.Mapping;
import org.apache.calcite.util.mapping.MappingType;
import org.apache.calcite.util.mapping.Mappings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.immutables.value.Value;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
/** Materialized view rewriting for aggregate.
*
* @param <C> Configuration type
*/
public abstract class MaterializedViewAggregateRule<C extends MaterializedViewAggregateRule.Config>
extends MaterializedViewRule<C> {
protected static final ImmutableList<TimeUnitRange> SUPPORTED_DATE_TIME_ROLLUP_UNITS =
ImmutableList.of(TimeUnitRange.YEAR, TimeUnitRange.QUARTER, TimeUnitRange.MONTH,
TimeUnitRange.DAY, TimeUnitRange.HOUR, TimeUnitRange.MINUTE,
TimeUnitRange.SECOND, TimeUnitRange.MILLISECOND, TimeUnitRange.MICROSECOND);
/** Creates a MaterializedViewAggregateRule. */
MaterializedViewAggregateRule(C config) {
super(config);
}
@Override protected boolean isValidPlan(@Nullable Project topProject, RelNode node,
RelMetadataQuery mq) {
if (!(node instanceof Aggregate)) {
return false;
}
Aggregate aggregate = (Aggregate) node;
if (aggregate.getGroupType() != Aggregate.Group.SIMPLE) {
// TODO: Rewriting with grouping sets not supported yet
return false;
}
return isValidRelNodePlan(aggregate.getInput(), mq);
}
@Override protected @Nullable ViewPartialRewriting compensateViewPartial(
RelBuilder relBuilder,
RexBuilder rexBuilder,
RelMetadataQuery mq,
RelNode input,
@Nullable Project topProject,
RelNode node,
Set<RelTableRef> queryTableRefs,
EquivalenceClasses queryEC,
@Nullable Project topViewProject,
RelNode viewNode,
Set<RelTableRef> viewTableRefs) {
// Modify view to join with missing tables and add Project on top to reorder columns.
// In turn, modify view plan to join with missing tables before Aggregate operator,
// change Aggregate operator to group by previous grouping columns and columns in
// attached tables, and add a final Project on top.
// We only need to add the missing tables on top of the view and view plan using
// a cartesian product.
// Then the rest of the rewriting algorithm can be executed in the same
// fashion, and if there are predicates between the existing and missing
// tables, the rewriting algorithm will enforce them.
final Set<RelTableRef> extraTableRefs = new HashSet<>();
for (RelTableRef tRef : queryTableRefs) {
if (!viewTableRefs.contains(tRef)) {
// Add to extra tables if table is not part of the view
extraTableRefs.add(tRef);
}
}
Multimap<Class<? extends RelNode>, RelNode> nodeTypes = mq.getNodeTypes(node);
if (nodeTypes == null) {
return null;
}
Collection<RelNode> tableScanNodes = nodeTypes.get(TableScan.class);
if (tableScanNodes == null) {
return null;
}
List<RelNode> newRels = new ArrayList<>();
for (RelTableRef tRef : extraTableRefs) {
int i = 0;
for (RelNode relNode : tableScanNodes) {
TableScan scan = (TableScan) relNode;
if (tRef.getQualifiedName().equals(scan.getTable().getQualifiedName())) {
if (tRef.getEntityNumber() == i++) {
newRels.add(relNode);
break;
}
}
}
}
assert extraTableRefs.size() == newRels.size();
relBuilder.push(input);
for (RelNode newRel : newRels) {
// Add to the view
relBuilder.push(newRel);
relBuilder.join(JoinRelType.INNER, rexBuilder.makeLiteral(true));
}
final RelNode newView = relBuilder.build();
final Aggregate aggregateViewNode = (Aggregate) viewNode;
relBuilder.push(aggregateViewNode.getInput());
int offset = 0;
for (RelNode newRel : newRels) {
// Add to the view plan
relBuilder.push(newRel);
relBuilder.join(JoinRelType.INNER, rexBuilder.makeLiteral(true));
offset += newRel.getRowType().getFieldCount();
}
// Modify aggregate: add grouping columns
ImmutableBitSet.Builder groupSet = ImmutableBitSet.builder();
groupSet.addAll(aggregateViewNode.getGroupSet());
groupSet.addAll(
ImmutableBitSet.range(
aggregateViewNode.getInput().getRowType().getFieldCount(),
aggregateViewNode.getInput().getRowType().getFieldCount() + offset));
final Aggregate newViewNode =
aggregateViewNode.copy(aggregateViewNode.getTraitSet(),
relBuilder.build(), groupSet.build(), null,
aggregateViewNode.getAggCallList());
relBuilder.push(newViewNode);
List<RexNode> nodes = new ArrayList<>();
List<String> fieldNames = new ArrayList<>();
if (topViewProject != null) {
// Insert existing expressions (and shift aggregation arguments),
// then append rest of columns
Mappings.TargetMapping shiftMapping =
Mappings.createShiftMapping(newViewNode.getRowType().getFieldCount(),
0, 0, aggregateViewNode.getGroupCount(),
newViewNode.getGroupCount(), aggregateViewNode.getGroupCount(),
aggregateViewNode.getAggCallList().size());
for (int i = 0; i < topViewProject.getProjects().size(); i++) {
nodes.add(
topViewProject.getProjects().get(i).accept(
new RexPermuteInputsShuttle(shiftMapping, newViewNode)));
fieldNames.add(topViewProject.getRowType().getFieldNames().get(i));
}
for (int i = aggregateViewNode.getRowType().getFieldCount();
i < newViewNode.getRowType().getFieldCount(); i++) {
int idx = i - aggregateViewNode.getAggCallList().size();
nodes.add(rexBuilder.makeInputRef(newViewNode, idx));
fieldNames.add(newViewNode.getRowType().getFieldNames().get(idx));
}
} else {
// Original grouping columns, aggregation columns, then new grouping columns
for (int i = 0; i < newViewNode.getRowType().getFieldCount(); i++) {
int idx;
if (i < aggregateViewNode.getGroupCount()) {
idx = i;
} else if (i < aggregateViewNode.getRowType().getFieldCount()) {
idx = i + offset;
} else {
idx = i - aggregateViewNode.getAggCallList().size();
}
nodes.add(rexBuilder.makeInputRef(newViewNode, idx));
fieldNames.add(newViewNode.getRowType().getFieldNames().get(idx));
}
}
relBuilder.project(nodes, fieldNames, true);
final Project newTopViewProject = (Project) relBuilder.build();
return ViewPartialRewriting.of(newView, newTopViewProject, newViewNode);
}
@Override protected @Nullable RelNode rewriteQuery(
RelBuilder relBuilder,
RexBuilder rexBuilder,
RexSimplify simplify,
RelMetadataQuery mq,
RexNode compensationColumnsEquiPred,
RexNode otherCompensationPred,
@Nullable Project topProject,
RelNode node,
BiMap<RelTableRef, RelTableRef> queryToViewTableMapping,
EquivalenceClasses viewEC, EquivalenceClasses queryEC) {
Aggregate aggregate = (Aggregate) node;
// Our target node is the node below the root, which should have the maximum
// number of available expressions in the tree in order to maximize our
// number of rewritings.
// If the program is available, we execute it to maximize rewriting opportunities.
// For instance, a program might pull up all the expressions that are below the
// aggregate so we can introduce compensation filters easily. This is important
// depending on the planner strategy.
RelNode newAggregateInput = aggregate.getInput(0);
RelNode target = aggregate.getInput(0);
HepProgram unionRewritingPullProgram = config.unionRewritingPullProgram();
if (unionRewritingPullProgram != null) {
final HepPlanner tmpPlanner = new HepPlanner(unionRewritingPullProgram);
tmpPlanner.setRoot(newAggregateInput);
newAggregateInput = tmpPlanner.findBestExp();
target = newAggregateInput.getInput(0);
}
// We need to check that all columns required by compensating predicates
// are contained in the query.
List<RexNode> queryExprs = extractReferences(rexBuilder, target);
if (!compensationColumnsEquiPred.isAlwaysTrue()) {
RexNode newCompensationColumnsEquiPred =
rewriteExpression(rexBuilder, mq, target, target, queryExprs,
queryToViewTableMapping, queryEC, false,
compensationColumnsEquiPred);
if (newCompensationColumnsEquiPred == null) {
// Skip it
return null;
}
compensationColumnsEquiPred = newCompensationColumnsEquiPred;
}
// For the rest, we use the query equivalence classes
if (!otherCompensationPred.isAlwaysTrue()) {
RexNode newOtherCompensationPred =
rewriteExpression(rexBuilder, mq, target, target, queryExprs,
queryToViewTableMapping, viewEC, true,
otherCompensationPred);
if (newOtherCompensationPred == null) {
// Skip it
return null;
}
otherCompensationPred = newOtherCompensationPred;
}
final RexNode queryCompensationPred =
RexUtil.not(
RexUtil.composeConjunction(rexBuilder,
ImmutableList.of(compensationColumnsEquiPred,
otherCompensationPred)));
// Generate query rewriting.
RelNode rewrittenPlan = relBuilder
.push(target)
.filter(simplify.simplifyUnknownAsFalse(queryCompensationPred))
.build();
if (config.unionRewritingPullProgram() != null) {
return aggregate.copy(aggregate.getTraitSet(),
ImmutableList.of(
newAggregateInput.copy(newAggregateInput.getTraitSet(),
ImmutableList.of(rewrittenPlan))));
}
return aggregate.copy(aggregate.getTraitSet(), ImmutableList.of(rewrittenPlan));
}
@Override protected @Nullable RelNode createUnion(RelBuilder relBuilder, RexBuilder rexBuilder,
@Nullable RelNode topProject, RelNode unionInputQuery, RelNode unionInputView) {
// Union
relBuilder.push(unionInputQuery);
relBuilder.push(unionInputView);
relBuilder.union(true);
List<RexNode> exprList = new ArrayList<>(relBuilder.peek().getRowType().getFieldCount());
List<String> nameList = new ArrayList<>(relBuilder.peek().getRowType().getFieldCount());
for (int i = 0; i < relBuilder.peek().getRowType().getFieldCount(); i++) {
// We can take unionInputQuery as it is query based.
RelDataTypeField field = unionInputQuery.getRowType().getFieldList().get(i);
exprList.add(
rexBuilder.ensureType(
field.getType(),
rexBuilder.makeInputRef(relBuilder.peek(), i),
true));
nameList.add(field.getName());
}
relBuilder.project(exprList, nameList);
// Rollup aggregate
Aggregate aggregate = (Aggregate) unionInputQuery;
final ImmutableBitSet groupSet = ImmutableBitSet.range(aggregate.getGroupCount());
final List<AggCall> aggregateCalls = new ArrayList<>();
for (int i = 0; i < aggregate.getAggCallList().size(); i++) {
AggregateCall aggCall = aggregate.getAggCallList().get(i);
if (aggCall.isDistinct()) {
// Cannot ROLLUP distinct
return null;
}
SqlAggFunction rollupAgg = aggCall.getAggregation().getRollup();
if (rollupAgg == null) {
// Cannot rollup this aggregate, bail out
return null;
}
final RexInputRef operand =
rexBuilder.makeInputRef(relBuilder.peek(),
aggregate.getGroupCount() + i);
aggregateCalls.add(
relBuilder.aggregateCall(rollupAgg, operand)
.distinct(aggCall.isDistinct())
.approximate(aggCall.isApproximate())
.as(aggCall.name));
}
RelNode prevNode = relBuilder.peek();
RelNode result = relBuilder
.aggregate(relBuilder.groupKey(groupSet), aggregateCalls)
.build();
if (prevNode == result && groupSet.cardinality() != result.getRowType().getFieldCount()) {
// Aggregate was not inserted but we need to prune columns
result = relBuilder
.push(result)
.project(relBuilder.fields(groupSet))
.build();
}
if (topProject != null) {
// Top project
return topProject.copy(topProject.getTraitSet(), ImmutableList.of(result));
}
// Result
return result;
}
@Override protected @Nullable RelNode rewriteView(
RelBuilder relBuilder,
RexBuilder rexBuilder,
RexSimplify simplify,
RelMetadataQuery mq,
MatchModality matchModality,
boolean unionRewriting,
RelNode input,
@Nullable Project topProject,
RelNode node,
@Nullable Project topViewProject0,
RelNode viewNode,
BiMap<RelTableRef, RelTableRef> queryToViewTableMapping,
EquivalenceClasses queryEC) {
final Aggregate queryAggregate = (Aggregate) node;
final Aggregate viewAggregate = (Aggregate) viewNode;
// Get group by references and aggregate call input references needed
final ImmutableBitSet.Builder indexes = ImmutableBitSet.builder();
final ImmutableBitSet references;
if (topProject != null && !unionRewriting) {
// We have a Project on top, gather only what is needed
final RelOptUtil.InputFinder inputFinder =
new RelOptUtil.InputFinder(new LinkedHashSet<>());
inputFinder.visitEach(topProject.getProjects());
references = inputFinder.build();
for (int i = 0; i < queryAggregate.getGroupCount(); i++) {
indexes.set(queryAggregate.getGroupSet().nth(i));
}
for (int i = 0; i < queryAggregate.getAggCallList().size(); i++) {
if (references.get(queryAggregate.getGroupCount() + i)) {
for (int inputIdx : queryAggregate.getAggCallList().get(i).getArgList()) {
indexes.set(inputIdx);
}
}
}
} else {
// No project on top, all of them are needed
for (int i = 0; i < queryAggregate.getGroupCount(); i++) {
indexes.set(queryAggregate.getGroupSet().nth(i));
}
for (AggregateCall queryAggCall : queryAggregate.getAggCallList()) {
for (int inputIdx : queryAggCall.getArgList()) {
indexes.set(inputIdx);
}
}
references = null;
}
// Create mapping from query columns to view columns
final List<RexNode> rollupNodes = new ArrayList<>();
final Multimap<Integer, Integer> m =
generateMapping(rexBuilder, simplify, mq,
queryAggregate.getInput(), viewAggregate.getInput(), indexes.build(),
queryToViewTableMapping, queryEC, rollupNodes);
if (m == null) {
// Bail out
return null;
}
// We could map all expressions. Create aggregate mapping.
@SuppressWarnings("unused")
int viewAggregateAdditionalFieldCount = rollupNodes.size();
int viewInputFieldCount = viewAggregate.getInput().getRowType().getFieldCount();
int viewInputDifferenceViewFieldCount =
viewAggregate.getRowType().getFieldCount() - viewInputFieldCount;
int viewAggregateTotalFieldCount =
viewAggregate.getRowType().getFieldCount() + rollupNodes.size();
boolean forceRollup = false;
Mapping aggregateMapping =
Mappings.create(MappingType.FUNCTION,
queryAggregate.getRowType().getFieldCount(),
viewAggregateTotalFieldCount);
for (int i = 0; i < queryAggregate.getGroupCount(); i++) {
Collection<Integer> c = m.get(queryAggregate.getGroupSet().nth(i));
for (int j : c) {
if (j >= viewAggregate.getInput().getRowType().getFieldCount()) {
// This is one of the rollup columns
aggregateMapping.set(i, j + viewInputDifferenceViewFieldCount);
forceRollup = true;
} else {
int targetIdx = viewAggregate.getGroupSet().indexOf(j);
if (targetIdx == -1) {
continue;
}
aggregateMapping.set(i, targetIdx);
}
break;
}
if (aggregateMapping.getTargetOpt(i) == -1) {
// It is not part of group by, we bail out
return null;
}
}
boolean containsDistinctAgg = false;
for (Ord<AggregateCall> ord : Ord.zip(queryAggregate.getAggCallList())) {
if (references != null
&& !references.get(queryAggregate.getGroupCount() + ord.i)) {
// Ignore
continue;
}
final AggregateCall queryAggCall = ord.e;
if (queryAggCall.filterArg >= 0) {
// Not supported currently
return null;
}
List<Integer> queryAggCallIndexes = new ArrayList<>();
for (int aggCallIdx : queryAggCall.getArgList()) {
queryAggCallIndexes.add(m.get(aggCallIdx).iterator().next());
}
for (int j = 0; j < viewAggregate.getAggCallList().size(); j++) {
AggregateCall viewAggCall = viewAggregate.getAggCallList().get(j);
if (queryAggCall.getAggregation().getKind() != viewAggCall.getAggregation().getKind()
|| queryAggCall.isDistinct() != viewAggCall.isDistinct()
|| queryAggCall.getArgList().size() != viewAggCall.getArgList().size()
|| queryAggCall.getType() != viewAggCall.getType()
|| viewAggCall.filterArg >= 0) {
// Continue
continue;
}
if (!queryAggCallIndexes.equals(viewAggCall.getArgList())) {
// Continue
continue;
}
aggregateMapping.set(queryAggregate.getGroupCount() + ord.i,
viewAggregate.getGroupCount() + j);
if (queryAggCall.isDistinct()) {
containsDistinctAgg = true;
}
break;
}
}
// To simplify things, create an identity topViewProject if not present.
final Project topViewProject = topViewProject0 != null
? topViewProject0
: (Project) relBuilder.push(viewNode)
.project(relBuilder.fields(), ImmutableList.of(), true)
.build();
// Generate result rewriting
final List<RexNode> additionalViewExprs = new ArrayList<>();
// Multimap is required since a column in the materialized view's project
// could map to multiple columns in the target query
final ImmutableMultimap<Integer, Integer> rewritingMapping;
relBuilder.push(input);
// We create view expressions that will be used in a Project on top of the
// view in case we need to rollup the expression
final List<RexNode> inputViewExprs = new ArrayList<>(relBuilder.fields());
if (forceRollup
|| queryAggregate.getGroupCount() != viewAggregate.getGroupCount()
|| matchModality == MatchModality.VIEW_PARTIAL) {
if (containsDistinctAgg) {
// Cannot rollup DISTINCT aggregate
return null;
}
// Target is coarser level of aggregation. Generate an aggregate.
final ImmutableMultimap.Builder<Integer, Integer> rewritingMappingB =
ImmutableMultimap.builder();
final ImmutableBitSet.Builder groupSetB = ImmutableBitSet.builder();
for (int i = 0; i < queryAggregate.getGroupCount(); i++) {
final int targetIdx = aggregateMapping.getTargetOpt(i);
if (targetIdx == -1) {
// No matching group by column, we bail out
return null;
}
if (targetIdx >= viewAggregate.getRowType().getFieldCount()) {
RexNode targetNode =
rollupNodes.get(targetIdx - viewInputFieldCount
- viewInputDifferenceViewFieldCount);
// We need to rollup this expression
final Multimap<RexNode, Integer> exprsLineage = ArrayListMultimap.create();
for (int r : RelOptUtil.InputFinder.bits(targetNode)) {
final int j = find(viewNode, r);
final int k = find(topViewProject, j);
if (k < 0) {
// No matching column needed for computed expression, bail out
return null;
}
final RexInputRef ref =
relBuilder.with(viewNode.getInput(0), b -> b.field(r));
exprsLineage.put(ref, k);
}
// We create the new node pointing to the index
groupSetB.set(inputViewExprs.size());
rewritingMappingB.put(inputViewExprs.size(), i);
additionalViewExprs.add(
new RexInputRef(targetIdx, targetNode.getType()));
// We need to create the rollup expression
RexNode rollupExpression =
requireNonNull(shuttleReferences(rexBuilder, targetNode, exprsLineage),
() -> "shuttleReferences produced null for targetNode="
+ targetNode + ", exprsLineage=" + exprsLineage);
inputViewExprs.add(rollupExpression);
} else {
// This expression should be referenced directly
final int k = find(topViewProject, targetIdx);
if (k < 0) {
// No matching group by column, we bail out
return null;
}
groupSetB.set(k);
rewritingMappingB.put(k, i);
}
}
final ImmutableBitSet groupSet = groupSetB.build();
final List<AggCall> aggregateCalls = new ArrayList<>();
for (Ord<AggregateCall> ord : Ord.zip(queryAggregate.getAggCallList())) {
final int sourceIdx = queryAggregate.getGroupCount() + ord.i;
if (references != null && !references.get(sourceIdx)) {
// Ignore
continue;
}
final int targetIdx =
aggregateMapping.getTargetOpt(sourceIdx);
if (targetIdx < 0) {
// No matching aggregation column, we bail out
return null;
}
final int k = find(topViewProject, targetIdx);
if (k < 0) {
// No matching aggregation column, we bail out
return null;
}
final AggregateCall queryAggCall = ord.e;
SqlAggFunction rollupAgg = queryAggCall.getAggregation().getRollup();
if (rollupAgg == null) {
// Cannot rollup this aggregate, bail out
return null;
}
rewritingMappingB.put(k,
queryAggregate.getGroupCount() + aggregateCalls.size());
final RexInputRef operand = rexBuilder.makeInputRef(input, k);
aggregateCalls.add(
relBuilder.aggregateCall(rollupAgg, operand)
.approximate(queryAggCall.isApproximate())
.distinct(queryAggCall.isDistinct())
.as(queryAggCall.name));
}
// Create aggregate on top of input
final RelNode prevNode = relBuilder.peek();
if (inputViewExprs.size() > prevNode.getRowType().getFieldCount()) {
relBuilder.project(inputViewExprs);
}
relBuilder
.aggregate(relBuilder.groupKey(groupSet), aggregateCalls);
if (prevNode == relBuilder.peek()
&& groupSet.cardinality() != relBuilder.peek().getRowType().getFieldCount()) {
// Aggregate was not inserted but we need to prune columns
relBuilder.project(relBuilder.fields(groupSet));
}
// We introduce a project on top, as group by columns order is lost.
// Multimap is required since a column in the materialized view's project
// could map to multiple columns in the target query.
rewritingMapping = rewritingMappingB.build();
final ImmutableMultimap<Integer, Integer> inverseMapping = rewritingMapping.inverse();
final List<RexNode> projects = new ArrayList<>();
final ImmutableBitSet.Builder addedProjects = ImmutableBitSet.builder();
for (int i = 0; i < queryAggregate.getGroupCount(); i++) {
final int pos = groupSet.indexOf(inverseMapping.get(i).iterator().next());
addedProjects.set(pos);
projects.add(relBuilder.field(pos));
}
final ImmutableBitSet projectedCols = addedProjects.build();
// We add aggregate functions that are present in result to projection list
for (int i = 0; i < relBuilder.peek().getRowType().getFieldCount(); i++) {
if (!projectedCols.get(i)) {
projects.add(relBuilder.field(i));
}
}
relBuilder.project(projects);
} else {
rewritingMapping = null;
}
// Add query expressions on top. We first map query expressions to view
// expressions. Once we have done that, if the expression is contained
// and we have introduced already an operator on top of the input node,
// we use the mapping to resolve the position of the expression in the
// node.
final RelDataType topRowType;
final List<RexNode> topExprs = new ArrayList<>();
if (topProject != null && !unionRewriting) {
topExprs.addAll(topProject.getProjects());
topRowType = topProject.getRowType();
} else {
// Add all
for (int pos = 0; pos < queryAggregate.getRowType().getFieldCount(); pos++) {
topExprs.add(rexBuilder.makeInputRef(queryAggregate, pos));
}
topRowType = queryAggregate.getRowType();
}
// Available in view.
final Multimap<RexNode, Integer> viewExprs = ArrayListMultimap.create();
addAllIndexed(viewExprs, topViewProject.getProjects());
addAllIndexed(viewExprs, additionalViewExprs);
final List<RexNode> rewrittenExprs = new ArrayList<>(topExprs.size());
for (RexNode expr : topExprs) {
// First map through the aggregate
final RexNode e2 = shuttleReferences(rexBuilder, expr, aggregateMapping);
if (e2 == null) {
// Cannot map expression
return null;
}
// Next map through the last project
final RexNode e3 =
shuttleReferences(rexBuilder, e2, viewExprs,
relBuilder.peek(), rewritingMapping);
if (e3 == null) {
// Cannot map expression
return null;
}
rewrittenExprs.add(e3);
}
return relBuilder
.project(rewrittenExprs)
.convert(topRowType, false)
.build();
}
private static <K> void addAllIndexed(Multimap<K, Integer> multimap,
Iterable<? extends K> list) {
for (K k : list) {
multimap.put(k, multimap.size());
}
}
/** Given a relational expression with a single input (such as a Project or
* Aggregate) and the ordinal of an input field, returns the ordinal of the
* output field that references the input field. Or -1 if the field is not
* propagated.
*
* <p>For example, if {@code rel} is {@code Project(c0, c2)} (on input with
* columns (c0, c1, c2)), then {@code find(rel, 2)} returns 1 (c2);
* {@code find(rel, 1)} returns -1 (because c1 is not projected).
*
* <p>If {@code rel} is {@code Aggregate([0, 2], sum(1))}, then
* {@code find(rel, 2)} returns 1, and {@code find(rel, 1)} returns -1.
*
* @param rel Relational expression
* @param ref Ordinal of output field
* @return Ordinal of input field, or -1
*/
private static int find(RelNode rel, int ref) {
if (rel instanceof Project) {
Project project = (Project) rel;
for (Ord<RexNode> p : Ord.zip(project.getProjects())) {
if (p.e instanceof RexInputRef
&& ((RexInputRef) p.e).getIndex() == ref) {
return p.i;
}
}
}
if (rel instanceof Aggregate) {
Aggregate aggregate = (Aggregate) rel;
int k = aggregate.getGroupSet().indexOf(ref);
if (k >= 0) {
return k;
}
}
return -1;
}
/**
* Mapping from node expressions to target expressions.
*
* <p>If any of the expressions cannot be mapped, we return null.
*/
protected @Nullable Multimap<Integer, Integer> generateMapping(
RexBuilder rexBuilder,
RexSimplify simplify,
RelMetadataQuery mq,
RelNode node,
RelNode target,
ImmutableBitSet positions,
BiMap<RelTableRef, RelTableRef> tableMapping,
EquivalenceClasses sourceEC,
List<RexNode> additionalExprs) {
checkArgument(additionalExprs.isEmpty());
Multimap<Integer, Integer> m = ArrayListMultimap.create();
Map<RexTableInputRef, Set<RexTableInputRef>> equivalenceClassesMap =
sourceEC.getEquivalenceClassesMap();
Multimap<RexNode, Integer> exprsLineage = ArrayListMultimap.create();
final List<RexNode> timestampExprs = new ArrayList<>();
for (int i = 0; i < target.getRowType().getFieldCount(); i++) {
Set<RexNode> s = mq.getExpressionLineage(target, rexBuilder.makeInputRef(target, i));
if (s == null) {
// Bail out
continue;
}
// We only support project - filter - join, thus it should map to
// a single expression
final RexNode e = Iterables.getOnlyElement(s);
// Rewrite expr to be expressed on query tables
final RexNode simplified = simplify.simplifyUnknownAsFalse(e);
final RexNode expr =
RexUtil.swapTableColumnReferences(rexBuilder, simplified,
tableMapping.inverse(), equivalenceClassesMap);
exprsLineage.put(expr, i);
SqlTypeName sqlTypeName = expr.getType().getSqlTypeName();
if (sqlTypeName == SqlTypeName.TIMESTAMP
|| sqlTypeName == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE
|| sqlTypeName == SqlTypeName.TIMESTAMP_TZ) {
timestampExprs.add(expr);
}
}
// If this is a column of TIMESTAMP (WITH LOCAL TIME ZONE)
// type, we add the possible rollup columns too.
// This way we will be able to match FLOOR(ts to HOUR) to
// FLOOR(ts to DAY) via FLOOR(FLOOR(ts to HOUR) to DAY)
for (RexNode timestampExpr : timestampExprs) {
for (TimeUnitRange value : SUPPORTED_DATE_TIME_ROLLUP_UNITS) {
final SqlFunction[] functions =
{getCeilSqlFunction(value), getFloorSqlFunction(value)};
for (SqlFunction function : functions) {
final RexNode call =
rexBuilder.makeCall(function,
timestampExpr, rexBuilder.makeFlag(value));
// References self-row
final RexNode rewrittenCall =
shuttleReferences(rexBuilder, call, exprsLineage);
if (rewrittenCall == null) {
continue;
}
// We add the CEIL or FLOOR expression to the additional
// expressions, replacing the child expression by the position that
// it references
additionalExprs.add(rewrittenCall);
// Then we simplify the expression and we add it to the expressions
// lineage so we can try to find a match.
final RexNode simplified =
simplify.simplifyUnknownAsFalse(call);
exprsLineage.put(simplified,
target.getRowType().getFieldCount() + additionalExprs.size() - 1);
}
}
}
for (int i : positions) {
Set<RexNode> s = mq.getExpressionLineage(node, rexBuilder.makeInputRef(node, i));
if (s == null) {
// Bail out
return null;
}
// We only support project - filter - join, thus it should map to
// a single expression
final RexNode e = Iterables.getOnlyElement(s);
// Rewrite expr to be expressed on query tables
final RexNode simplified = simplify.simplifyUnknownAsFalse(e);
RexNode targetExpr =
RexUtil.swapColumnReferences(rexBuilder, simplified,
equivalenceClassesMap);
final Collection<Integer> c = exprsLineage.get(targetExpr);
if (!c.isEmpty()) {
for (Integer j : c) {
m.put(i, j);
}
} else {
// If we did not find the expression, try to navigate it
RexNode rewrittenTargetExpr =
shuttleReferences(rexBuilder, targetExpr, exprsLineage);
if (rewrittenTargetExpr == null) {
// Some expressions were not present
return null;
}
m.put(i, target.getRowType().getFieldCount() + additionalExprs.size());
additionalExprs.add(rewrittenTargetExpr);
}
}
return m;
}
/**
* Get ceil function datetime.
*/
protected SqlFunction getCeilSqlFunction(TimeUnitRange flag) {
return SqlStdOperatorTable.CEIL;
}
/**
* Get floor function datetime.
*/
protected SqlFunction getFloorSqlFunction(TimeUnitRange flag) {
return SqlStdOperatorTable.FLOOR;
}
/**
* Get rollup aggregation function.
*/
@Deprecated // to be removed before 2.0
protected @Nullable SqlAggFunction getRollup(SqlAggFunction aggregation) {
if (aggregation == SqlStdOperatorTable.SUM
|| aggregation == SqlStdOperatorTable.SUM0
|| aggregation instanceof SqlMinMaxAggFunction
|| aggregation == SqlStdOperatorTable.ANY_VALUE) {
return aggregation;
} else if (aggregation == SqlStdOperatorTable.COUNT) {
return SqlStdOperatorTable.SUM0;
} else {
return null;
}
}
@Override public Pair<@Nullable RelNode, RelNode> pushFilterToOriginalViewPlan(RelBuilder builder,
@Nullable RelNode topViewProject, RelNode viewNode, RexNode cond) {
// We add (and push) the filter to the view plan before triggering the rewriting.
// This is useful in case some of the columns can be folded to same value after
// filter is added.
HepProgramBuilder pushFiltersProgram = new HepProgramBuilder();
if (topViewProject != null) {
pushFiltersProgram.addRuleInstance(config.filterProjectTransposeRule());
}
pushFiltersProgram
.addRuleInstance(config.filterAggregateTransposeRule())
.addRuleInstance(config.aggregateProjectPullUpConstantsRule())
.addRuleInstance(config.projectMergeRule());
final HepPlanner tmpPlanner = new HepPlanner(pushFiltersProgram.build());
// Now that the planner is created, push the node
RelNode topNode = builder
.push(topViewProject != null ? topViewProject : viewNode)
.filter(cond).build();
tmpPlanner.setRoot(topNode);
topNode = tmpPlanner.findBestExp();
RelNode resultTopViewProject = null;
RelNode resultViewNode = null;
while (topNode != null) {
if (topNode instanceof Project) {
if (resultTopViewProject != null) {
// Both projects could not be merged, we will bail out
return Pair.of(topViewProject, viewNode);
}
resultTopViewProject = topNode;
topNode = topNode.getInput(0);
} else if (topNode instanceof Aggregate) {
resultViewNode = topNode;
topNode = null;
} else {
// We move to the child
topNode = topNode.getInput(0);
}
}
return Pair.of(resultTopViewProject, requireNonNull(resultViewNode, "resultViewNode"));
}
/**
* Rule configuration.
*/
public interface Config extends MaterializedViewRule.Config {
/** Instance of rule to push filter through project. */
@Value.Default default RelOptRule filterProjectTransposeRule() {
return CoreRules.FILTER_PROJECT_TRANSPOSE.config
.withRelBuilderFactory(relBuilderFactory())
.as(FilterProjectTransposeRule.Config.class)
.withOperandFor(Filter.class, filter ->
!RexUtil.containsCorrelation(filter.getCondition()),
Project.class, project -> true)
.withCopyFilter(true)
.withCopyProject(true)
.toRule();
}
/** Sets {@link #filterProjectTransposeRule()}. */
Config withFilterProjectTransposeRule(RelOptRule rule);
/** Instance of rule to push filter through aggregate. */
@Value.Default default RelOptRule filterAggregateTransposeRule() {
return CoreRules.FILTER_AGGREGATE_TRANSPOSE.config
.withRelBuilderFactory(relBuilderFactory())
.as(FilterAggregateTransposeRule.Config.class)
.withOperandFor(Filter.class, Aggregate.class)
.toRule();
}
/** Sets {@link #filterAggregateTransposeRule()}. */
Config withFilterAggregateTransposeRule(RelOptRule rule);
/** Instance of rule to pull up constants into aggregate. */
@Value.Default default RelOptRule aggregateProjectPullUpConstantsRule() {
return AggregateProjectPullUpConstantsRule.Config.DEFAULT
.withRelBuilderFactory(relBuilderFactory())
.withDescription("AggFilterPullUpConstants")
.as(AggregateProjectPullUpConstantsRule.Config.class)
.withOperandFor(Aggregate.class, Filter.class)
.toRule();
}
/** Sets {@link #aggregateProjectPullUpConstantsRule()}. */
Config withAggregateProjectPullUpConstantsRule(RelOptRule rule);
/** Instance of rule to merge project operators. */
@Value.Default default RelOptRule projectMergeRule() {
return CoreRules.PROJECT_MERGE.config
.withRelBuilderFactory(relBuilderFactory())
.toRule();
}
/** Sets {@link #projectMergeRule()}. */
Config withProjectMergeRule(RelOptRule rule);
}
}