blob: 01357faaa6cfc04102bece35e049c0c43e9c11c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.base.filter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.Pair;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillFilterRel;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.FilterPrel;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.base.filter.ExprNode.AndNode;
import org.apache.drill.exec.store.base.filter.FilterPushDownListener.ScanPushDownListener;
import com.google.common.collect.ImmutableSet;
/**
* Generalized filter push-down strategy which performs all the tree-walking
* and tree restructuring work, allowing a "listener" to do the work needed
* for a particular scan.
* <p>
* General usage in a storage plugin: <code><pre>
* public Set<StoragePluginOptimizerRule> getOptimizerRules(
* OptimizerRulesContext optimizerRulesContext, PlannerPhase phase) {
* switch (phase) {
* case PHYSICAL:
* return FilterPushDownStrategy.rulesFor(optimizerRulesContext,
* new MyPushDownListener(...));
* ...
* }
* }
* </pre></code>
*/
public class FilterPushDownStrategy {
private static final Collection<String> BANNED_OPERATORS =
Collections.singletonList("flatten");
/**
* Base rule that passes target information to the push-down strategy
*/
private static abstract class AbstractFilterPushDownRule extends StoragePluginOptimizerRule {
protected final FilterPushDownStrategy strategy;
public AbstractFilterPushDownRule(RelOptRuleOperand operand, String description,
FilterPushDownStrategy strategy) {
super(operand, description);
this.strategy = strategy;
}
}
/**
* Custom rule passed to Calcite for FILTER --> PROJECT --> SCAN
*/
private static class ProjectAndFilterRule extends AbstractFilterPushDownRule {
private ProjectAndFilterRule(FilterPushDownStrategy strategy) {
super(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
strategy.namePrefix() + "PushDownFilter:Filter_On_Project",
strategy);
}
@Override
public boolean matches(RelOptRuleCall call) {
if (!super.matches(call)) {
return false;
}
DrillScanRel scan = call.rel(2);
return strategy.isTargetScan(scan);
}
@Override
public void onMatch(RelOptRuleCall call) {
DrillFilterRel filterRel = call.rel(0);
DrillProjectRel projectRel = call.rel(1);
DrillScanRel scanRel = call.rel(2);
strategy.onMatch(call, filterRel, projectRel, scanRel);
}
}
/**
* Custom rule passed to Calcite to handle FILTER --> SCAN
*/
private static class FilterWithoutProjectRule extends AbstractFilterPushDownRule {
private FilterWithoutProjectRule(FilterPushDownStrategy strategy) {
super(RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)),
strategy.namePrefix() + "PushDownFilter:Filter_On_Scan",
strategy);
}
@Override
public boolean matches(RelOptRuleCall call) {
if (!super.matches(call)) {
return false;
}
DrillScanRel scan = call.rel(1);
return strategy.isTargetScan(scan);
}
@Override
public void onMatch(RelOptRuleCall call) {
DrillFilterRel filterRel = call.rel(0);
DrillScanRel scanRel = call.rel(1);
strategy.onMatch(call, filterRel, null, scanRel);
}
}
/**
* Implement filter push-down for one scan.
*/
private static class FilterPushDownBuilder {
private final RelOptRuleCall call;
private final DrillFilterRel filter;
private final DrillProjectRel project;
private final DrillScanRel scan;
private final ScanPushDownListener scanListener;
// Predicates which cannot be converted to a filter predicate
List<RexNode> nonConvertedPreds = new ArrayList<>();
protected FilterPushDownBuilder(RelOptRuleCall call, DrillFilterRel filter, DrillProjectRel project, DrillScanRel scan, ScanPushDownListener scanListener) {
this.call = call;
this.filter = filter;
this.project = project;
this.scan = scan;
this.scanListener = scanListener;
}
void apply() {
AndNode cnfNode = sortPredicates();
if (cnfNode == null) {
return;
}
Pair<GroupScan, List<RexNode>> translated =
scanListener.transform(cnfNode);
// Listener abandoned effort. (Allows a stub early in development.)
if (translated == null) {
return;
}
// Listener rejected the DNF terms
GroupScan newGroupScan = translated.left;
if (newGroupScan == null) {
return;
}
// Gather unqualified and rewritten predicates
List<RexNode> remainingPreds = new ArrayList<>();
remainingPreds.addAll(nonConvertedPreds);
if (translated.right != null) {
remainingPreds.addAll(translated.right);
}
// Replace the child with the new filter on top of the child/scan
call.transformTo(rebuildTree(newGroupScan, remainingPreds));
}
private AndNode sortPredicates() {
// Get the filter expression
RexNode condition;
if (project == null) {
condition = filter.getCondition();
} else {
// get the filter as if it were below the projection.
condition = RelOptUtil.pushPastProject(filter.getCondition(), project);
}
// Skip if no expression or expression is trivial.
// This seems to never happen because Calcite optimizes away
// any expression of the form WHERE true, 1 = 1 or 0 = 1.
if (condition == null || condition.isAlwaysTrue() || condition.isAlwaysFalse()) {
return null;
}
// Get a conjunction of the filter conditions. For each conjunction, if it refers
// to ITEM or FLATTEN expression then it cannot be pushed down. Otherwise, it's
// qualified to be pushed down.
List<RexNode> filterPreds = RelOptUtil.conjunctions(
RexUtil.toCnf(filter.getCluster().getRexBuilder(), condition));
DrillParseContext parseContext = new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner()));
List<ExprNode> conjuncts = new ArrayList<>();
for (RexNode pred : filterPreds) {
ExprNode conjunct = identifyCandidate(parseContext, scan, pred);
if (conjunct == null) {
nonConvertedPreds.add(pred);
} else {
conjunct.tag(pred);
conjuncts.add(conjunct);
}
}
return conjuncts.isEmpty() ? null : new AndNode(conjuncts);
}
public ExprNode identifyCandidate(DrillParseContext parseContext, DrillScanRel scan, RexNode pred) {
if (DrillRelOptUtil.findOperators(pred, Collections.emptyList(), BANNED_OPERATORS) != null) {
return null;
}
// Extract an AND term, which may be an OR expression.
LogicalExpression drillPredicate = DrillOptiq.toDrill(parseContext, scan, pred);
ExprNode expr = drillPredicate.accept(FilterPushDownUtils.REL_OP_EXTRACTOR, null);
if (expr == null) {
return null;
}
// Check if each term can be pushed down, and, if so, return a new RelOp
// with the value normalized.
return scanListener.accept(expr);
}
/**
* Rebuilds the query plan subtree to include any substitutions and removals requested
* by the listener.
*
* @param newGroupScan the optional replacement scan node given by the listener
* @param remainingPreds the Calcite predicates which the listener *does not* handle
* and which should remain in the plan tree
* @return a rebuilt query subtree
*/
private RelNode rebuildTree(GroupScan newGroupScan, List<RexNode> remainingPreds) {
// Rebuild the subtree with transformed nodes.
// Scan: new if available, else existing.
RelNode newNode;
if (newGroupScan == null) {
newNode = scan;
} else {
newNode = new DrillScanRel(scan.getCluster(), scan.getTraitSet(), scan.getTable(),
newGroupScan, scan.getRowType(), scan.getColumns());
}
// Copy project, if exists
if (project != null) {
newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode));
}
// Add filter, if any predicates remain.
if (!remainingPreds.isEmpty()) {
// If some of the predicates weren't used in the filter, creates new filter with them
// on top of current scan. Excludes the case when all predicates weren't used in the filter.
// FILTER(a, b, c) --> SCAN becomes FILTER(a, d) --> SCAN
newNode = filter.copy(filter.getTraitSet(), newNode,
RexUtil.composeConjunction(
filter.getCluster().getRexBuilder(),
remainingPreds,
true));
}
return newNode;
}
}
private final FilterPushDownListener listener;
public FilterPushDownStrategy(FilterPushDownListener listener) {
this.listener = listener;
}
public Set<StoragePluginOptimizerRule> rules() {
return ImmutableSet.of(
new ProjectAndFilterRule(this),
new FilterWithoutProjectRule(this));
}
public static Set<StoragePluginOptimizerRule> rulesFor(
FilterPushDownListener listener) {
return new FilterPushDownStrategy(listener).rules();
}
private String namePrefix() { return listener.prefix(); }
private boolean isTargetScan(DrillScanRel scan) {
return listener.isTargetScan(scan.getGroupScan());
}
public void onMatch(RelOptRuleCall call, DrillFilterRel filter, DrillProjectRel project, DrillScanRel scan) {
// Skip if rule has already been applied.
ScanPushDownListener scanListener = listener.builderFor(scan.getGroupScan());
if (scanListener != null) {
new FilterPushDownBuilder(call, filter, project, scan, scanListener).apply();
}
}
}