blob: 6780bfba77b038ed798e7f64870af10a1408ea30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet;
import org.apache.calcite.rel.core.Filter;
import org.apache.drill.exec.physical.base.AbstractGroupScanWithMetadata;
import org.apache.drill.exec.expr.FilterPredicate;
import com.google.common.base.Stopwatch;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.FilterPrel;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
public abstract class FilePushDownFilter extends StoragePluginOptimizerRule {
private static final Logger logger = LoggerFactory.getLogger(FilePushDownFilter.class);
private static final Collection<String> BANNED_OPERATORS;
static {
BANNED_OPERATORS = new ArrayList<>(1);
BANNED_OPERATORS.add("flatten");
}
public static RelOptRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) {
return new FilePushDownFilter(
RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))),
"FilePushDownFilter:Filter_On_Project", optimizerRulesContext) {
@Override
public boolean matches(RelOptRuleCall call) {
final ScanPrel scan = call.rel(2);
if (scan.getGroupScan().supportsFilterPushDown()) {
return super.matches(call);
}
return false;
}
@Override
public void onMatch(RelOptRuleCall call) {
final FilterPrel filterRel = call.rel(0);
final ProjectPrel projectRel = call.rel(1);
final ScanPrel scanRel = call.rel(2);
doOnMatch(call, filterRel, projectRel, scanRel);
}
};
}
public static StoragePluginOptimizerRule getFilterOnScan(OptimizerRulesContext optimizerContext) {
return new FilePushDownFilter(
RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
"FilePushDownFilter:Filter_On_Scan", optimizerContext) {
@Override
public boolean matches(RelOptRuleCall call) {
final ScanPrel scan = call.rel(1);
if (scan.getGroupScan().supportsFilterPushDown()) {
return super.matches(call);
}
return false;
}
@Override
public void onMatch(RelOptRuleCall call) {
final FilterPrel filterRel = call.rel(0);
final ScanPrel scanRel = call.rel(1);
doOnMatch(call, filterRel, null, scanRel);
}
};
}
// private final boolean useNewReader;
protected final OptimizerRulesContext optimizerContext;
private FilePushDownFilter(RelOptRuleOperand operand, String id, OptimizerRulesContext optimizerContext) {
super(operand, id);
this.optimizerContext = optimizerContext;
}
protected void doOnMatch(RelOptRuleCall call, FilterPrel filter, ProjectPrel project, ScanPrel scan) {
AbstractGroupScanWithMetadata<?> groupScan = (AbstractGroupScanWithMetadata<?>) scan.getGroupScan();
if (groupScan.getFilter() != null && !groupScan.getFilter().equals(ValueExpressions.BooleanExpression.TRUE)) {
return;
}
RexNode condition;
if (project == null) {
condition = filter.getCondition();
} else {
// get the filter as if it were below the projection.
condition = RelOptUtil.pushPastProject(filter.getCondition(), project);
}
if (condition == null || condition.isAlwaysTrue()) {
return;
}
// get a conjunctions of the filter condition. For each conjunction, if it refers to ITEM or FLATTEN expression
// then we could not pushed down. Otherwise, it's qualified to be pushed down.
// Limits the number of nodes that can be created out of the conversion to avoid
// exponential growth of nodes count and further OOM
final List<RexNode> predList = RelOptUtil.conjunctions(RexUtil.toCnf(filter.getCluster().getRexBuilder(), 100, condition));
final List<RexNode> qualifiedPredList = new ArrayList<>();
// list of predicates which cannot be converted to filter predicate
List<RexNode> nonConvertedPredList = new ArrayList<>();
for (RexNode pred : predList) {
if (DrillRelOptUtil.findOperators(pred, Collections.emptyList(), BANNED_OPERATORS) == null) {
LogicalExpression drillPredicate = DrillOptiq.toDrill(
new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, pred);
// checks whether predicate may be used for filter pushdown
FilterPredicate<?> filterPredicate =
groupScan.getFilterPredicate(drillPredicate,
optimizerContext,
optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions(), false);
// collects predicates that contain unsupported for filter pushdown expressions
// to build filter with them
if (filterPredicate == null) {
nonConvertedPredList.add(pred);
}
qualifiedPredList.add(pred);
} else {
nonConvertedPredList.add(pred);
}
}
final RexNode qualifiedPred = RexUtil.composeConjunction(filter.getCluster().getRexBuilder(), qualifiedPredList, true);
if (qualifiedPred == null) {
return;
}
LogicalExpression conditionExp = DrillOptiq.toDrill(
new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, qualifiedPred);
// Default - pass the original filter expr to (potentially) be used at run-time
groupScan.setFilterForRuntime(conditionExp, optimizerContext); // later may remove or set to another filter (see below)
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
AbstractGroupScanWithMetadata<?> newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions());
if (timer != null) {
logger.debug("Took {} ms to apply filter. ", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
}
// For the case when newGroupScan wasn't created, the old one may
// fully match the filter for the case when row group pruning did not happen.
if (newGroupScan == null) {
if (groupScan.isMatchAllMetadata()) {
RelNode child = project == null ? scan : project;
// If current row group fully matches filter,
// but row group pruning did not happen, remove the filter.
if (nonConvertedPredList.isEmpty()) {
groupScan.setFilterForRuntime(null, optimizerContext); // disable the original filter expr (i.e. don't use it at run-time)
call.transformTo(child);
} else if (nonConvertedPredList.size() == predList.size()) {
// None of the predicates participated in filter pushdown.
return;
} else {
// If some of the predicates weren't used in the filter, creates new filter with them
// on top of current scan. Excludes the case when all predicates weren't used in the filter.
Filter theNewFilter = filter.copy(filter.getTraitSet(), child,
RexUtil.composeConjunction(
filter.getCluster().getRexBuilder(),
nonConvertedPredList,
true));
LogicalExpression filterPredicate = DrillOptiq.toDrill(
new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, theNewFilter.getCondition());
groupScan.setFilterForRuntime(filterPredicate, optimizerContext); // pass the new filter expr to (potentialy) be used at run-time
call.transformTo(theNewFilter); // Replace the child with the new filter on top of the child/scan
}
}
return;
}
RelNode newNode = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
if (project != null) {
newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode));
}
if (newGroupScan.isMatchAllMetadata()) {
// creates filter from the expressions which can't be pushed to the scan
if (!nonConvertedPredList.isEmpty()) {
Filter theFilterRel = filter.copy(filter.getTraitSet(), newNode,
RexUtil.composeConjunction(
filter.getCluster().getRexBuilder(),
nonConvertedPredList,
true));
LogicalExpression filterPredicate = DrillOptiq.toDrill(
new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, theFilterRel.getCondition());
newGroupScan.setFilterForRuntime(filterPredicate, optimizerContext); // pass the new filter expr to (potentialy) be used at run-time
newNode = theFilterRel; // replace the new node with the new filter on top of that new node
}
call.transformTo(newNode);
return;
}
final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newNode));
call.transformTo(newFilter);
}
}