blob: 01357faaa6cfc04102bece35e049c0c43e9c11c9 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.Pair;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillFilterRel;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.FilterPrel;
import org.apache.drill.exec.planner.physical.PrelUtil;
* Generalized filter push-down strategy which performs all the tree-walking
* and tree restructuring work, allowing a "listener" to do the work needed
* for a particular scan.
* <p>
* General usage in a storage plugin: <code><pre>
* public Set<StoragePluginOptimizerRule> getOptimizerRules(
* OptimizerRulesContext optimizerRulesContext, PlannerPhase phase) {
* switch (phase) {
* case PHYSICAL:
* return FilterPushDownStrategy.rulesFor(optimizerRulesContext,
* new MyPushDownListener(...));
* ...
* }
* }
* </pre></code>
public class FilterPushDownStrategy {
private static final Collection<String> BANNED_OPERATORS =
* Base rule that passes target information to the push-down strategy
private static abstract class AbstractFilterPushDownRule extends StoragePluginOptimizerRule {
protected final FilterPushDownStrategy strategy;
public AbstractFilterPushDownRule(RelOptRuleOperand operand, String description,
FilterPushDownStrategy strategy) {
super(operand, description);
this.strategy = strategy;
* Custom rule passed to Calcite for FILTER --> PROJECT --> SCAN
private static class ProjectAndFilterRule extends AbstractFilterPushDownRule {
private ProjectAndFilterRule(FilterPushDownStrategy strategy) {
super(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
strategy.namePrefix() + "PushDownFilter:Filter_On_Project",
public boolean matches(RelOptRuleCall call) {
if (!super.matches(call)) {
return false;
DrillScanRel scan = call.rel(2);
return strategy.isTargetScan(scan);
public void onMatch(RelOptRuleCall call) {
DrillFilterRel filterRel = call.rel(0);
DrillProjectRel projectRel = call.rel(1);
DrillScanRel scanRel = call.rel(2);
strategy.onMatch(call, filterRel, projectRel, scanRel);
* Custom rule passed to Calcite to handle FILTER --> SCAN
private static class FilterWithoutProjectRule extends AbstractFilterPushDownRule {
private FilterWithoutProjectRule(FilterPushDownStrategy strategy) {
super(RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)),
strategy.namePrefix() + "PushDownFilter:Filter_On_Scan",
public boolean matches(RelOptRuleCall call) {
if (!super.matches(call)) {
return false;
DrillScanRel scan = call.rel(1);
return strategy.isTargetScan(scan);
public void onMatch(RelOptRuleCall call) {
DrillFilterRel filterRel = call.rel(0);
DrillScanRel scanRel = call.rel(1);
strategy.onMatch(call, filterRel, null, scanRel);
* Implement filter push-down for one scan.
private static class FilterPushDownBuilder {
private final RelOptRuleCall call;
private final DrillFilterRel filter;
private final DrillProjectRel project;
private final DrillScanRel scan;
private final ScanPushDownListener scanListener;
// Predicates which cannot be converted to a filter predicate
List<RexNode> nonConvertedPreds = new ArrayList<>();
protected FilterPushDownBuilder(RelOptRuleCall call, DrillFilterRel filter, DrillProjectRel project, DrillScanRel scan, ScanPushDownListener scanListener) { = call;
this.filter = filter;
this.project = project;
this.scan = scan;
this.scanListener = scanListener;
void apply() {
AndNode cnfNode = sortPredicates();
if (cnfNode == null) {
Pair<GroupScan, List<RexNode>> translated =
// Listener abandoned effort. (Allows a stub early in development.)
if (translated == null) {
// Listener rejected the DNF terms
GroupScan newGroupScan = translated.left;
if (newGroupScan == null) {
// Gather unqualified and rewritten predicates
List<RexNode> remainingPreds = new ArrayList<>();
if (translated.right != null) {
// Replace the child with the new filter on top of the child/scan
call.transformTo(rebuildTree(newGroupScan, remainingPreds));
private AndNode sortPredicates() {
// Get the filter expression
RexNode condition;
if (project == null) {
condition = filter.getCondition();
} else {
// get the filter as if it were below the projection.
condition = RelOptUtil.pushPastProject(filter.getCondition(), project);
// Skip if no expression or expression is trivial.
// This seems to never happen because Calcite optimizes away
// any expression of the form WHERE true, 1 = 1 or 0 = 1.
if (condition == null || condition.isAlwaysTrue() || condition.isAlwaysFalse()) {
return null;
// Get a conjunction of the filter conditions. For each conjunction, if it refers
// to ITEM or FLATTEN expression then it cannot be pushed down. Otherwise, it's
// qualified to be pushed down.
List<RexNode> filterPreds = RelOptUtil.conjunctions(
RexUtil.toCnf(filter.getCluster().getRexBuilder(), condition));
DrillParseContext parseContext = new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner()));
List<ExprNode> conjuncts = new ArrayList<>();
for (RexNode pred : filterPreds) {
ExprNode conjunct = identifyCandidate(parseContext, scan, pred);
if (conjunct == null) {
} else {
return conjuncts.isEmpty() ? null : new AndNode(conjuncts);
public ExprNode identifyCandidate(DrillParseContext parseContext, DrillScanRel scan, RexNode pred) {
if (DrillRelOptUtil.findOperators(pred, Collections.emptyList(), BANNED_OPERATORS) != null) {
return null;
// Extract an AND term, which may be an OR expression.
LogicalExpression drillPredicate = DrillOptiq.toDrill(parseContext, scan, pred);
ExprNode expr = drillPredicate.accept(FilterPushDownUtils.REL_OP_EXTRACTOR, null);
if (expr == null) {
return null;
// Check if each term can be pushed down, and, if so, return a new RelOp
// with the value normalized.
return scanListener.accept(expr);
* Rebuilds the query plan subtree to include any substitutions and removals requested
* by the listener.
* @param newGroupScan the optional replacement scan node given by the listener
* @param remainingPreds the Calcite predicates which the listener *does not* handle
* and which should remain in the plan tree
* @return a rebuilt query subtree
private RelNode rebuildTree(GroupScan newGroupScan, List<RexNode> remainingPreds) {
// Rebuild the subtree with transformed nodes.
// Scan: new if available, else existing.
RelNode newNode;
if (newGroupScan == null) {
newNode = scan;
} else {
newNode = new DrillScanRel(scan.getCluster(), scan.getTraitSet(), scan.getTable(),
newGroupScan, scan.getRowType(), scan.getColumns());
// Copy project, if exists
if (project != null) {
newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode));
// Add filter, if any predicates remain.
if (!remainingPreds.isEmpty()) {
// If some of the predicates weren't used in the filter, creates new filter with them
// on top of current scan. Excludes the case when all predicates weren't used in the filter.
// FILTER(a, b, c) --> SCAN becomes FILTER(a, d) --> SCAN
newNode = filter.copy(filter.getTraitSet(), newNode,
return newNode;
private final FilterPushDownListener listener;
public FilterPushDownStrategy(FilterPushDownListener listener) {
this.listener = listener;
public Set<StoragePluginOptimizerRule> rules() {
return ImmutableSet.of(
new ProjectAndFilterRule(this),
new FilterWithoutProjectRule(this));
public static Set<StoragePluginOptimizerRule> rulesFor(
FilterPushDownListener listener) {
return new FilterPushDownStrategy(listener).rules();
private String namePrefix() { return listener.prefix(); }
private boolean isTargetScan(DrillScanRel scan) {
return listener.isTargetScan(scan.getGroupScan());
public void onMatch(RelOptRuleCall call, DrillFilterRel filter, DrillProjectRel project, DrillScanRel scan) {
// Skip if rule has already been applied.
ScanPushDownListener scanListener = listener.builderFor(scan.getGroupScan());
if (scanListener != null) {
new FilterPushDownBuilder(call, filter, project, scan, scanListener).apply();