blob: cc10225a66feb590ddd3e7d53672e0f00c76ba5d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.impl.rule;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamUnnestRel;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRuleCall;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.volcano.RelSubset;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.SingleRel;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Correlate;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.JoinRelType;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Uncollect;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCorrelate;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalProject;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexFieldAccess;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
/**
* A {@code ConverterRule} to replace {@link Correlate} {@link Uncollect} with {@link
* BeamUnnestRule}.
*/
public class BeamUnnestRule extends RelOptRule {
public static final BeamUnnestRule INSTANCE = new BeamUnnestRule();
// TODO: more general Correlate
private BeamUnnestRule() {
super(
operand(
LogicalCorrelate.class, operand(RelNode.class, any()), operand(SingleRel.class, any())),
"BeamUnnestRule");
}
@Override
public void onMatch(RelOptRuleCall call) {
LogicalCorrelate correlate = call.rel(0);
RelNode outer = call.rel(1);
RelNode uncollect = call.rel(2);
if (correlate.getCorrelationId().getId() != 0) {
// Only one level of correlation nesting is supported
return;
}
if (correlate.getRequiredColumns().cardinality() != 1) {
// can only unnest a single column
return;
}
if (correlate.getJoinType() != JoinRelType.INNER) {
return;
}
if (!(uncollect instanceof Uncollect)) {
// Drop projection
uncollect = ((SingleRel) uncollect).getInput();
if (uncollect instanceof RelSubset) {
uncollect = ((RelSubset) uncollect).getOriginal();
}
if (!(uncollect instanceof Uncollect)) {
return;
}
}
RelNode project = ((Uncollect) uncollect).getInput();
if (project instanceof RelSubset) {
project = ((RelSubset) project).getOriginal();
}
if (!(project instanceof LogicalProject)) {
return;
}
if (((LogicalProject) project).getProjects().size() != 1) {
// can only unnest a single column
return;
}
RexNode exp = ((LogicalProject) project).getProjects().get(0);
if (!(exp instanceof RexFieldAccess)) {
return;
}
int fieldIndex = ((RexFieldAccess) exp).getField().getIndex();
call.transformTo(
new BeamUnnestRel(
correlate.getCluster(),
correlate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
outer,
call.rel(2).getRowType(),
fieldIndex));
}
}