| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.store.base.filter; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.calcite.plan.RelOptRuleCall; |
| import org.apache.calcite.plan.RelOptRuleOperand; |
| import org.apache.calcite.plan.RelOptUtil; |
| import org.apache.calcite.rel.RelNode; |
| import org.apache.calcite.rex.RexNode; |
| import org.apache.calcite.rex.RexUtil; |
| import org.apache.calcite.util.Pair; |
| import org.apache.drill.common.expression.LogicalExpression; |
| import org.apache.drill.exec.physical.base.GroupScan; |
| import org.apache.drill.exec.planner.common.DrillRelOptUtil; |
| import org.apache.drill.exec.planner.logical.DrillFilterRel; |
| import org.apache.drill.exec.planner.logical.DrillOptiq; |
| import org.apache.drill.exec.planner.logical.DrillParseContext; |
| import org.apache.drill.exec.planner.logical.DrillProjectRel; |
| import org.apache.drill.exec.planner.logical.DrillScanRel; |
| import org.apache.drill.exec.planner.logical.RelOptHelper; |
| import org.apache.drill.exec.planner.physical.FilterPrel; |
| import org.apache.drill.exec.planner.physical.PrelUtil; |
| import org.apache.drill.exec.store.StoragePluginOptimizerRule; |
| import org.apache.drill.exec.store.base.filter.ExprNode.AndNode; |
| import org.apache.drill.exec.store.base.filter.FilterPushDownListener.ScanPushDownListener; |
| import com.google.common.collect.ImmutableSet; |
| |
| /** |
| * Generalized filter push-down strategy which performs all the tree-walking |
| * and tree restructuring work, allowing a "listener" to do the work needed |
| * for a particular scan. |
| * <p> |
| * General usage in a storage plugin: <code><pre> |
| * public Set<StoragePluginOptimizerRule> getOptimizerRules( |
| * OptimizerRulesContext optimizerRulesContext, PlannerPhase phase) { |
| * switch (phase) { |
| * case PHYSICAL: |
| * return FilterPushDownStrategy.rulesFor(optimizerRulesContext, |
| * new MyPushDownListener(...)); |
| * ... |
| * } |
| * } |
| * </pre></code> |
| */ |
| public class FilterPushDownStrategy { |
| |
| private static final Collection<String> BANNED_OPERATORS = |
| Collections.singletonList("flatten"); |
| |
| /** |
| * Base rule that passes target information to the push-down strategy |
| */ |
| private static abstract class AbstractFilterPushDownRule extends StoragePluginOptimizerRule { |
| |
| protected final FilterPushDownStrategy strategy; |
| |
| public AbstractFilterPushDownRule(RelOptRuleOperand operand, String description, |
| FilterPushDownStrategy strategy) { |
| super(operand, description); |
| this.strategy = strategy; |
| } |
| } |
| |
| /** |
| * Custom rule passed to Calcite for FILTER --> PROJECT --> SCAN |
| */ |
| private static class ProjectAndFilterRule extends AbstractFilterPushDownRule { |
| |
| private ProjectAndFilterRule(FilterPushDownStrategy strategy) { |
| super(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))), |
| strategy.namePrefix() + "PushDownFilter:Filter_On_Project", |
| strategy); |
| } |
| |
| @Override |
| public boolean matches(RelOptRuleCall call) { |
| if (!super.matches(call)) { |
| return false; |
| } |
| DrillScanRel scan = call.rel(2); |
| return strategy.isTargetScan(scan); |
| } |
| |
| @Override |
| public void onMatch(RelOptRuleCall call) { |
| DrillFilterRel filterRel = call.rel(0); |
| DrillProjectRel projectRel = call.rel(1); |
| DrillScanRel scanRel = call.rel(2); |
| strategy.onMatch(call, filterRel, projectRel, scanRel); |
| } |
| } |
| |
| /** |
| * Custom rule passed to Calcite to handle FILTER --> SCAN |
| */ |
| private static class FilterWithoutProjectRule extends AbstractFilterPushDownRule { |
| |
| private FilterWithoutProjectRule(FilterPushDownStrategy strategy) { |
| super(RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)), |
| strategy.namePrefix() + "PushDownFilter:Filter_On_Scan", |
| strategy); |
| } |
| |
| @Override |
| public boolean matches(RelOptRuleCall call) { |
| if (!super.matches(call)) { |
| return false; |
| } |
| DrillScanRel scan = call.rel(1); |
| return strategy.isTargetScan(scan); |
| } |
| |
| @Override |
| public void onMatch(RelOptRuleCall call) { |
| DrillFilterRel filterRel = call.rel(0); |
| DrillScanRel scanRel = call.rel(1); |
| strategy.onMatch(call, filterRel, null, scanRel); |
| } |
| } |
| |
| /** |
| * Implement filter push-down for one scan. |
| */ |
| private static class FilterPushDownBuilder { |
| |
| private final RelOptRuleCall call; |
| private final DrillFilterRel filter; |
| private final DrillProjectRel project; |
| private final DrillScanRel scan; |
| private final ScanPushDownListener scanListener; |
| // Predicates which cannot be converted to a filter predicate |
| List<RexNode> nonConvertedPreds = new ArrayList<>(); |
| |
| protected FilterPushDownBuilder(RelOptRuleCall call, DrillFilterRel filter, DrillProjectRel project, DrillScanRel scan, ScanPushDownListener scanListener) { |
| this.call = call; |
| this.filter = filter; |
| this.project = project; |
| this.scan = scan; |
| this.scanListener = scanListener; |
| } |
| |
| void apply() { |
| AndNode cnfNode = sortPredicates(); |
| if (cnfNode == null) { |
| return; |
| } |
| |
| Pair<GroupScan, List<RexNode>> translated = |
| scanListener.transform(cnfNode); |
| |
| // Listener abandoned effort. (Allows a stub early in development.) |
| if (translated == null) { |
| return; |
| } |
| |
| // Listener rejected the DNF terms |
| GroupScan newGroupScan = translated.left; |
| if (newGroupScan == null) { |
| return; |
| } |
| |
| // Gather unqualified and rewritten predicates |
| List<RexNode> remainingPreds = new ArrayList<>(); |
| remainingPreds.addAll(nonConvertedPreds); |
| if (translated.right != null) { |
| remainingPreds.addAll(translated.right); |
| } |
| |
| // Replace the child with the new filter on top of the child/scan |
| call.transformTo(rebuildTree(newGroupScan, remainingPreds)); |
| } |
| |
| private AndNode sortPredicates() { |
| |
| // Get the filter expression |
| RexNode condition; |
| if (project == null) { |
| condition = filter.getCondition(); |
| } else { |
| // get the filter as if it were below the projection. |
| condition = RelOptUtil.pushPastProject(filter.getCondition(), project); |
| } |
| |
| // Skip if no expression or expression is trivial. |
| // This seems to never happen because Calcite optimizes away |
| // any expression of the form WHERE true, 1 = 1 or 0 = 1. |
| if (condition == null || condition.isAlwaysTrue() || condition.isAlwaysFalse()) { |
| return null; |
| } |
| |
| // Get a conjunction of the filter conditions. For each conjunction, if it refers |
| // to ITEM or FLATTEN expression then it cannot be pushed down. Otherwise, it's |
| // qualified to be pushed down. |
| List<RexNode> filterPreds = RelOptUtil.conjunctions( |
| RexUtil.toCnf(filter.getCluster().getRexBuilder(), condition)); |
| |
| DrillParseContext parseContext = new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())); |
| List<ExprNode> conjuncts = new ArrayList<>(); |
| for (RexNode pred : filterPreds) { |
| ExprNode conjunct = identifyCandidate(parseContext, scan, pred); |
| if (conjunct == null) { |
| nonConvertedPreds.add(pred); |
| } else { |
| conjunct.tag(pred); |
| conjuncts.add(conjunct); |
| } |
| } |
| return conjuncts.isEmpty() ? null : new AndNode(conjuncts); |
| } |
| |
| public ExprNode identifyCandidate(DrillParseContext parseContext, DrillScanRel scan, RexNode pred) { |
| if (DrillRelOptUtil.findOperators(pred, Collections.emptyList(), BANNED_OPERATORS) != null) { |
| return null; |
| } |
| |
| // Extract an AND term, which may be an OR expression. |
| LogicalExpression drillPredicate = DrillOptiq.toDrill(parseContext, scan, pred); |
| ExprNode expr = drillPredicate.accept(FilterPushDownUtils.REL_OP_EXTRACTOR, null); |
| if (expr == null) { |
| return null; |
| } |
| |
| // Check if each term can be pushed down, and, if so, return a new RelOp |
| // with the value normalized. |
| return scanListener.accept(expr); |
| } |
| |
| /** |
| * Rebuilds the query plan subtree to include any substitutions and removals requested |
| * by the listener. |
| * |
| * @param newGroupScan the optional replacement scan node given by the listener |
| * @param remainingPreds the Calcite predicates which the listener *does not* handle |
| * and which should remain in the plan tree |
| * @return a rebuilt query subtree |
| */ |
| private RelNode rebuildTree(GroupScan newGroupScan, List<RexNode> remainingPreds) { |
| |
| // Rebuild the subtree with transformed nodes. |
| |
| // Scan: new if available, else existing. |
| RelNode newNode; |
| if (newGroupScan == null) { |
| newNode = scan; |
| } else { |
| newNode = new DrillScanRel(scan.getCluster(), scan.getTraitSet(), scan.getTable(), |
| newGroupScan, scan.getRowType(), scan.getColumns()); |
| } |
| |
| // Copy project, if exists |
| if (project != null) { |
| newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode)); |
| } |
| |
| // Add filter, if any predicates remain. |
| if (!remainingPreds.isEmpty()) { |
| |
| // If some of the predicates weren't used in the filter, creates new filter with them |
| // on top of current scan. Excludes the case when all predicates weren't used in the filter. |
| // FILTER(a, b, c) --> SCAN becomes FILTER(a, d) --> SCAN |
| newNode = filter.copy(filter.getTraitSet(), newNode, |
| RexUtil.composeConjunction( |
| filter.getCluster().getRexBuilder(), |
| remainingPreds, |
| true)); |
| } |
| |
| return newNode; |
| } |
| } |
| |
| private final FilterPushDownListener listener; |
| |
| public FilterPushDownStrategy(FilterPushDownListener listener) { |
| this.listener = listener; |
| } |
| |
| public Set<StoragePluginOptimizerRule> rules() { |
| return ImmutableSet.of( |
| new ProjectAndFilterRule(this), |
| new FilterWithoutProjectRule(this)); |
| } |
| |
| public static Set<StoragePluginOptimizerRule> rulesFor( |
| FilterPushDownListener listener) { |
| return new FilterPushDownStrategy(listener).rules(); |
| } |
| |
| private String namePrefix() { return listener.prefix(); } |
| |
| private boolean isTargetScan(DrillScanRel scan) { |
| return listener.isTargetScan(scan.getGroupScan()); |
| } |
| |
| public void onMatch(RelOptRuleCall call, DrillFilterRel filter, DrillProjectRel project, DrillScanRel scan) { |
| |
| // Skip if rule has already been applied. |
| ScanPushDownListener scanListener = listener.builderFor(scan.getGroupScan()); |
| if (scanListener != null) { |
| new FilterPushDownBuilder(call, filter, project, scan, scanListener).apply(); |
| } |
| } |
| } |