blob: fc84f65cdb3c8c082a65eb6bcfb8270416465b3d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.sql.planner;
import com.google.common.collect.ImmutableList;
import java.util.Map;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilderFactory;
import java.util.ArrayList;
import java.util.List;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.translator.JoinInputNode;
import org.apache.samza.sql.translator.JoinInputNode.InputType;
/**
* Planner rule for remote table joins that pushes filters above and
* within a join node into its children nodes.
* This class is customized form of Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule} for
* remote table joins.
*/
public abstract class SamzaSqlFilterRemoteJoinRule extends RelOptRule {
/** Whether to try to strengthen join-type. */
private final boolean smart;
Map<String, SqlIOConfig> systemStreamConfigBySource;
//~ Constructors -----------------------------------------------------------
/**
* Creates a SamzaSqlFilterRemoteJoinRule with an explicit root operand and
* factories.
*/
protected SamzaSqlFilterRemoteJoinRule(RelOptRuleOperand operand, String id,
boolean smart, RelBuilderFactory relBuilderFactory, Map<String, SqlIOConfig> systemStreamConfigBySource) {
super(operand, relBuilderFactory, "SamzaSqlFilterRemoteJoinRule:" + id);
this.smart = smart;
this.systemStreamConfigBySource = systemStreamConfigBySource;
}
//~ Methods ----------------------------------------------------------------
protected void perform(RelOptRuleCall call, Filter filter,
Join join) {
final List<RexNode> joinFilters =
RelOptUtil.conjunctions(join.getCondition());
boolean donotOptimizeLeft = false;
boolean donotOptimizeRight = false;
JoinInputNode.InputType inputTypeOnLeft =
JoinInputNode.getInputType(join.getLeft(), systemStreamConfigBySource);
JoinInputNode.InputType inputTypeOnRight =
JoinInputNode.getInputType(join.getRight(), systemStreamConfigBySource);
// Disable this optimization for queries using local table.
if (inputTypeOnLeft == InputType.LOCAL_TABLE || inputTypeOnRight == InputType.LOCAL_TABLE) {
donotOptimizeLeft = true;
donotOptimizeRight = true;
}
// There is nothing to optimize on the remote table side as the lookup needs to happen first before filtering.
if (inputTypeOnLeft == InputType.REMOTE_TABLE) {
donotOptimizeLeft = true;
}
if (inputTypeOnRight == InputType.REMOTE_TABLE) {
donotOptimizeRight = true;
}
// If there is only the joinRel,
// make sure it does not match a cartesian product joinRel
// (with "true" condition), otherwise this rule will be applied
// again on the new cartesian product joinRel.
if (filter == null && joinFilters.isEmpty()) {
return;
}
final List<RexNode> aboveFilters =
filter != null
? RelOptUtil.conjunctions(filter.getCondition())
: new ArrayList<>();
final ImmutableList<RexNode> origAboveFilters =
ImmutableList.copyOf(aboveFilters);
// Simplify Outer Joins
JoinRelType joinType = join.getJoinType();
if (smart
&& !origAboveFilters.isEmpty()
&& join.getJoinType() != JoinRelType.INNER) {
joinType = RelOptUtil.simplifyJoin(join, origAboveFilters, joinType);
}
final List<RexNode> leftFilters = new ArrayList<>();
final List<RexNode> rightFilters = new ArrayList<>();
// TODO - add logic to derive additional filters. E.g., from
// (t1.a = 1 AND t2.a = 2) OR (t1.b = 3 AND t2.b = 4), you can
// derive table filters:
// (t1.a = 1 OR t1.b = 3)
// (t2.a = 2 OR t2.b = 4)
// Try to push down above filters. These are typically where clause
// filters. They can be pushed down if they are not on the NULL
// generating side.
// We do not push into join condition as we do not benefit much. There is also correctness issue
// with remote table as we will not have values for the remote table before the join/lookup.
// leftFilters and rightFilters are populated in classifyFilters API.
boolean filterPushed = false;
if (RelOptUtil.classifyFilters(
join,
aboveFilters,
joinType,
false, // Let's not push into join filter
!joinType.generatesNullsOnLeft() && !donotOptimizeLeft,
!joinType.generatesNullsOnRight() && !donotOptimizeRight,
joinFilters,
leftFilters,
rightFilters)) {
filterPushed = true;
}
// If no filter got pushed after validate, reset filterPushed flag
if (leftFilters.isEmpty()
&& rightFilters.isEmpty()) {
filterPushed = false;
}
boolean isAntiJoin = joinType == JoinRelType.ANTI;
// Try to push down filters in ON clause. A ON clause filter can only be
// pushed down if it does not affect the non-matching set, i.e. it is
// not on the side which is preserved.
// A ON clause filter of anti-join can not be pushed down.
if (!isAntiJoin && RelOptUtil.classifyFilters(
join,
joinFilters,
joinType,
false,
!joinType.generatesNullsOnLeft() && !donotOptimizeLeft,
!joinType.generatesNullsOnRight() && !donotOptimizeRight,
joinFilters,
leftFilters,
rightFilters)) {
filterPushed = true;
}
// if nothing actually got pushed and there is nothing leftover,
// then this rule is a no-op
if ((!filterPushed
&& joinType == join.getJoinType())
|| (joinFilters.isEmpty()
&& leftFilters.isEmpty()
&& rightFilters.isEmpty())) {
return;
}
// create Filters on top of the children if any filters were
// pushed to them
final RexBuilder rexBuilder = join.getCluster().getRexBuilder();
final RelBuilder relBuilder = call.builder();
final RelNode leftRel = relBuilder.push(join.getLeft()).filter(leftFilters).build();
final RelNode rightRel = relBuilder.push(join.getRight()).filter(rightFilters).build();
// create the new join node referencing the new children and
// containing its new join filters (if there are any)
final ImmutableList<RelDataType> fieldTypes =
ImmutableList.<RelDataType>builder()
.addAll(RelOptUtil.getFieldTypeList(leftRel.getRowType()))
.addAll(RelOptUtil.getFieldTypeList(rightRel.getRowType())).build();
final RexNode joinFilter =
RexUtil.composeConjunction(rexBuilder,
RexUtil.fixUp(rexBuilder, joinFilters, fieldTypes));
// If nothing actually got pushed and there is nothing leftover,
// then this rule is a no-op
if (joinFilter.isAlwaysTrue()
&& leftFilters.isEmpty()
&& rightFilters.isEmpty()
&& joinType == join.getJoinType()) {
return;
}
RelNode newJoinRel =
join.copy(
join.getTraitSet(),
joinFilter,
leftRel,
rightRel,
joinType,
join.isSemiJoinDone());
call.getPlanner().onCopy(join, newJoinRel);
if (!leftFilters.isEmpty()) {
call.getPlanner().onCopy(filter, leftRel);
}
if (!rightFilters.isEmpty()) {
call.getPlanner().onCopy(filter, rightRel);
}
relBuilder.push(newJoinRel);
// Create a project on top of the join if some of the columns have become
// NOT NULL due to the join-type getting stricter.
relBuilder.convert(join.getRowType(), false);
// create a FilterRel on top of the join if needed
relBuilder.filter(
RexUtil.fixUp(rexBuilder, aboveFilters,
RelOptUtil.getFieldTypeList(relBuilder.peek().getRowType())));
call.transformTo(relBuilder.build());
}
/** Rule that tries to push the stream side of the filter expressions into the input of the join. */
public static class SamzaSqlFilterIntoRemoteJoinRule extends SamzaSqlFilterRemoteJoinRule {
public SamzaSqlFilterIntoRemoteJoinRule(boolean smart,
RelBuilderFactory relBuilderFactory, Map<String, SqlIOConfig> systemStreamConfigBySource) {
super(
operand(Filter.class,
operand(Join.class, RelOptRule.any())),
"SamzaSqlFilterRemoteJoinRule:filter", smart, relBuilderFactory, systemStreamConfigBySource);
}
@Override public void onMatch(RelOptRuleCall call) {
Filter filter = call.rel(0);
Join join = call.rel(1);
perform(call, filter, join);
}
}
}
// End SamzaSqlFilterRemoteJoinRule.java