blob: fd7ba211216587617d733c9a5ef8138db4000139 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.impl.rule;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.RelOptRuleCall;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.plan.volcano.RelSubset;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.SingleRel;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.Aggregate;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.Filter;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.Project;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.core.RelFactories;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.rules.AggregateProjectMergeRule;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.tools.RelBuilderFactory;
/**
* This rule is essentially a wrapper around Calcite's {@code AggregateProjectMergeRule}. In the
* case when an underlying IO supports project push-down it is more efficient to not merge {@code
* Project} with an {@code Aggregate}, leaving it for the {@code BeamIOPUshDownRule}.
*/
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class BeamAggregateProjectMergeRule extends AggregateProjectMergeRule {
public static final AggregateProjectMergeRule INSTANCE =
new BeamAggregateProjectMergeRule(
Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
public BeamAggregateProjectMergeRule(
Class<? extends Aggregate> aggregateClass,
Class<? extends Project> projectClass,
RelBuilderFactory relBuilderFactory) {
super(aggregateClass, projectClass, relBuilderFactory);
}
@Override
public void onMatch(RelOptRuleCall call) {
final Project project = call.rel(1);
BeamIOSourceRel io = getUnderlyingIO(new HashSet<>(), project);
// Only perform AggregateProjectMergeRule when IO is not present or project push-down is not
// supported.
if (io == null || !io.getBeamSqlTable().supportsProjects().isSupported()) {
super.onMatch(call);
}
}
/**
* Following scenarios are possible:<br>
* 1) Aggregate <- Project <- IO.<br>
* 2) Aggregate <- Project <- Chain of Project/Filter <- IO.<br>
* 3) Aggregate <- Project <- Something else.<br>
* 4) Aggregate <- Project <- Chain of Project/Filter <- Something else.
*
* @param parent project that matched this rule.
* @return {@code BeamIOSourceRel} when it is present or null when some other {@code RelNode} is
* present.
*/
private BeamIOSourceRel getUnderlyingIO(Set<RelNode> visitedNodes, SingleRel parent) {
// No need to look at the same node more than once.
if (visitedNodes.contains(parent)) {
return null;
}
visitedNodes.add(parent);
List<RelNode> nodes = ((RelSubset) parent.getInput()).getRelList();
for (RelNode node : nodes) {
if (node instanceof Filter || node instanceof Project) {
// Search node inputs for an IO.
BeamIOSourceRel child = getUnderlyingIO(visitedNodes, (SingleRel) node);
if (child != null) {
return child;
}
} else if (node instanceof BeamIOSourceRel) {
return (BeamIOSourceRel) node;
}
}
return null;
}
}