blob: cd998a166186fffc898e53b5aa0316afb2dd0a4e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.logical;
import org.apache.flink.table.functions.sql.internal.SqlAuxiliaryGroupAggFunction$;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.runtime.SqlFunctions;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilderFactory;
import java.util.ArrayList;
import java.util.List;
/**
* This rule is copied from Calcite's {@link org.apache.calcite.rel.rules.AggregateRemoveRule}.
* Modification:
* - only matches aggregate with with SIMPLE group and non-empty group
* - supports SUM, MIN, MAX, AUXILIARY_GROUP aggregate functions with no filterArgs
*/
/**
* Planner rule that removes
* a {@link org.apache.calcite.rel.core.Aggregate}
* if its aggregate functions are SUM, MIN, MAX, AUXILIARY_GROUP with no filterArgs,
* and the underlying relational expression is already distinct.
*/
public class FlinkAggregateRemoveRule extends RelOptRule {
public static final FlinkAggregateRemoveRule INSTANCE =
new FlinkAggregateRemoveRule(LogicalAggregate.class,
RelFactories.LOGICAL_BUILDER);
//~ Constructors -----------------------------------------------------------
@Deprecated // to be removed before 2.0
public FlinkAggregateRemoveRule(Class<? extends Aggregate> aggregateClass) {
this(aggregateClass, RelFactories.LOGICAL_BUILDER);
}
/**
* Creates an FlinkAggregateRemoveRule.
*/
public FlinkAggregateRemoveRule(Class<? extends Aggregate> aggregateClass,
RelBuilderFactory relBuilderFactory) {
// REVIEW jvs 14-Mar-2006: We have to explicitly mention the child here
// to make sure the rule re-fires after the child changes (e.g. via
// ProjectRemoveRule), since that may change our information
// about whether the child is distinct. If we clean up the inference of
// distinct to make it correct up-front, we can get rid of the reference
// to the child here.
super(
operand(aggregateClass,
operand(RelNode.class, any())),
relBuilderFactory, null);
}
@Override
public boolean matches(RelOptRuleCall call) {
final Aggregate aggregate = call.rel(0);
final RelNode input = call.rel(1);
if (aggregate.getGroupCount() == 0 || aggregate.indicator ||
aggregate.getGroupType() != Aggregate.Group.SIMPLE) {
return false;
}
for (AggregateCall aggCall : aggregate.getAggCallList()) {
SqlKind aggCallKind = aggCall.getAggregation().getKind();
// TODO supports more AggregateCalls
boolean isAllowAggCall = aggCallKind == SqlKind.SUM ||
aggCallKind == SqlKind.MIN ||
aggCallKind == SqlKind.MAX ||
aggCall.getAggregation() instanceof SqlAuxiliaryGroupAggFunction$;
if (!isAllowAggCall || aggCall.filterArg >= 0 || aggCall.getArgList().size() != 1) {
return false;
}
}
final RelMetadataQuery mq = call.getMetadataQuery();
return SqlFunctions.isTrue(mq.areColumnsUnique(input, aggregate.getGroupSet()));
}
//~ Methods ----------------------------------------------------------------
public void onMatch(RelOptRuleCall call) {
final Aggregate aggregate = call.rel(0);
final RelNode input = call.rel(1);
// Distinct is "GROUP BY c1, c2" (where c1, c2 are a set of columns on
// which the input is unique, i.e. contain a key) and has no aggregate
// functions or the functions we enumerated. It can be removed.
final RelNode newInput = convert(input, aggregate.getTraitSet().simplify());
// If aggregate was projecting a subset of columns, add a project for the
// same effect.
final RelBuilder relBuilder = call.builder();
relBuilder.push(newInput);
List<Integer> projectIndices = new ArrayList<>(aggregate.getGroupSet().asList());
for (AggregateCall aggCall : aggregate.getAggCallList()) {
projectIndices.addAll(aggCall.getArgList());
}
relBuilder.project(relBuilder.fields(projectIndices));
// Create a project if some of the columns have become
// NOT NULL due to aggregate functions are removed
relBuilder.convert(aggregate.getRowType(), true);
call.transformTo(relBuilder.build());
}
}
// End FlinkAggregateRemoveRule.java