blob: 957f514960ada3cc2a26f6863c4d148efdaa3379 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.logical.partition;
import java.util.ArrayDeque;
import java.util.BitSet;
import java.util.Deque;
import java.util.List;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexDynamicParam;
import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexOver;
import org.apache.calcite.rex.RexRangeRef;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.sql.SqlBinaryOperator;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.fun.SqlRowOperator;
import org.apache.calcite.util.Util;
import com.google.common.collect.Lists;
public class FindPartitionConditions extends RexVisitorImpl<Void> {
/** Whether an expression is a directory filter, and if so, whether
* it can be pushed into the scan.
*/
enum PushDirFilter {
NO_PUSH, PUSH, PARTIAL_PUSH
}
/**
* During top-down traversal of the expression tree, keep track of the
* current operators that are directory filters. Children that are
* directory filters add themselves to their parent operators.
*
* NOTE: this auxiliary class is necessary because RexNodes are immutable.
* If they were mutable, we could have easily added/dropped inputs as we
* encountered directory filters.
*/
public class OpState {
private SqlOperator sqlOperator;
private List<RexNode> children = Lists.newArrayList();
public OpState(SqlOperator op) {
sqlOperator = op;
}
public SqlOperator getOp() {
return sqlOperator;
}
public void addChild(RexNode n) {
if (!children.contains(n)) {
children.add(n);
}
}
public List<RexNode> getChildren() {
return children;
}
public void clear() {
children.clear();
}
}
private final BitSet dirs;
// The Scan could be projecting several dirN columns but we are only interested in the
// ones that are referenced by the Filter, so keep track of such referenced dirN columns.
private final BitSet referencedDirs;
private final List<PushDirFilter> pushStatusStack = Lists.newArrayList();
private final Deque<OpState> opStack = new ArrayDeque<OpState>();
/* While traversing the filter tree, some RexCalls need special handling where
* a partial expression cannot be built bottom up and the whole expression needs
* to be built. Eg: cast(VALUE/COLUMN as Type). We need to treat this as holistic
* expression and cannot treat the VALUE/COLUMN to be casted as a separate expression
* Below count keeps track of expressions that need such handling. Its type is an integer
* and not a boolean because we can have such nested expressions.
*/
private int holisticExpression = 0;
private RexBuilder builder = null;
private RexNode resultCondition = null;
public FindPartitionConditions(BitSet dirs) {
// go deep
super(true);
this.dirs = dirs;
this.referencedDirs = new BitSet(dirs.size());
}
public FindPartitionConditions(BitSet dirs, RexBuilder builder) {
// go deep
super(true);
this.dirs = dirs;
this.builder = builder;
this.referencedDirs = new BitSet(dirs.size());
}
public void analyze(RexNode exp) {
assert pushStatusStack.isEmpty();
exp.accept(this);
// Deal with top of stack
assert pushStatusStack.size() == 1;
PushDirFilter rootPushDirFilter = pushStatusStack.get(0);
if (rootPushDirFilter == PushDirFilter.PUSH) {
// The entire subtree was directory filter, so add it to the result.
addResult(exp);
}
pushStatusStack.clear();
}
public RexNode getFinalCondition() {
return resultCondition;
}
public BitSet getReferencedDirs() {
return referencedDirs;
}
private Void pushVariable() {
pushStatusStack.add(PushDirFilter.NO_PUSH);
return null;
}
private void addResult(RexNode exp) {
// processing a special expression, will be handled holistically by the parent
if (holisticExpression > 0) {
return;
}
// when we find a directory filter, add it to the current operator's
// children (if one exists)
if (!opStack.isEmpty()) {
OpState op = opStack.peek();
op.addChild(exp);
} else {
resultCondition = exp;
}
}
/*
* One of the children for the current OP is a NO PUSH. Clear all the children
* added for the current OP that were a PUSH.
*/
private void clearChildren() {
if (!opStack.isEmpty()) {
OpState op = opStack.peek();
if (op.getChildren().size() >= 1) {
op.clear();
}
}
}
/*
* Pop the current operation from the stack (opStack) as we are done with it.
* If it has children, then it means the current OP itself is a PUSH
* and hence add itself as a child to the parent OP. If no more parents
* remains, it means that the current OP is the root condition and
* set the root condition in that case.
*/
private void popOpStackAndBuildFilter() {
// Parsing a special expression; handled holistically by the parent
if (holisticExpression > 0) {
return;
}
OpState currentOp = opStack.pop();
int size = currentOp.getChildren().size();
RexNode newFilter = null;
if (size >= 1) {
if (size == 1 && currentOp.getOp() instanceof SqlBinaryOperator) {
/* The only operator for which we allow partial pushes is AND.
* For all other operators we clear the children if one of the
* children is a no push.
*/
if (currentOp.getOp().getKind() == SqlKind.AND) {
newFilter = currentOp.getChildren().get(0);
for (OpState opState : opStack) {
if (opState.getOp().getKind() == SqlKind.NOT) {
//AND under NOT should not get pushed
newFilter = null;
}
}
}
} else {
newFilter = builder.makeCall(currentOp.getOp(), currentOp.getChildren());
}
}
if (newFilter != null) {
// add this new filter to my parent boolean operator's children
if (!opStack.isEmpty()) {
OpState parentOp = opStack.peek();
parentOp.addChild(newFilter);
} else {
resultCondition = newFilter;
}
}
}
private boolean isHolisticExpression(RexCall call) {
/* If we are already processing a holistic expression then all
* nested expressions should be treated holistically as well
*/
if (holisticExpression > 0) {
return true;
}
if (call.getOperator().getSyntax() == SqlSyntax.SPECIAL ||
call.getOperator().getSyntax() == SqlSyntax.FUNCTION) {
return true;
}
return false;
}
protected boolean inputRefToPush(RexInputRef inputRef) {
return dirs.get(inputRef.getIndex());
}
public Void visitInputRef(RexInputRef inputRef) {
if (inputRefToPush(inputRef)) {
pushStatusStack.add(PushDirFilter.PUSH);
addResult(inputRef);
referencedDirs.set(inputRef.getIndex());
} else {
pushStatusStack.add(PushDirFilter.NO_PUSH);
}
return null;
}
public Void visitLiteral(RexLiteral literal) {
pushStatusStack.add(PushDirFilter.PUSH);
addResult(literal);
return null;
}
public Void visitOver(RexOver over) {
// assume NO_PUSH until proven otherwise
analyzeCall(over, PushDirFilter.NO_PUSH);
return null;
}
public Void visitCorrelVariable(RexCorrelVariable correlVariable) {
return pushVariable();
}
public Void visitCall(RexCall call) {
analyzeCall(call, PushDirFilter.PUSH);
return null;
}
/*
* Traverse over the RexCall recursively. All children that are potential
* PUSH are added as children to the current operator. Once all children
* of the current operator are processed and if it satisfies to be a
* PUSH then the current operator is added as a child to its parent and so on.
*
* This bottom up building does not work for some expressions like 'cast'.
* In that case, we don't add the current operator or its children in the
* opStack and based on whether the special expression is a PUSH we add the
* whole expression to its parent op. Fpr such special expressions we maintain
* specialExpression count.
*/
private void analyzeCall(RexCall call, PushDirFilter callPushDirFilter) {
if (isHolisticExpression(call)) {
holisticExpression++;
} else {
opStack.push(new OpState(call.getOperator()));
}
// visit operands, pushing their states onto stack
super.visitCall(call);
// look for NO_PUSH operands
int operandCount = call.getOperands().size();
List<PushDirFilter> operandStack = Util.last(pushStatusStack, operandCount);
for (PushDirFilter operandPushDirFilter : operandStack) {
if (operandPushDirFilter == PushDirFilter.NO_PUSH) {
callPushDirFilter = PushDirFilter.NO_PUSH;
} else if (operandPushDirFilter == PushDirFilter.PARTIAL_PUSH) {
callPushDirFilter = PushDirFilter.PARTIAL_PUSH;
}
}
// Even if all operands are PUSH, the call itself may
// be non-deterministic.
if (!call.getOperator().isDeterministic()) {
callPushDirFilter = PushDirFilter.NO_PUSH;
} else if (call.getOperator().isDynamicFunction()) {
// For now, treat it same as non-deterministic.
callPushDirFilter = PushDirFilter.NO_PUSH;
}
// Row operator itself can't be reduced to a PUSH
if ((callPushDirFilter == PushDirFilter.PUSH)
&& (call.getOperator() instanceof SqlRowOperator)) {
callPushDirFilter = PushDirFilter.NO_PUSH;
}
if (callPushDirFilter == PushDirFilter.NO_PUSH) {
OpState currentOp = opStack.peek();
if (currentOp != null) {
if (currentOp.sqlOperator.getKind() != SqlKind.AND) {
clearChildren();
} else {
// AND op, check if we pushed some children
if (currentOp.children.size() > 0) {
callPushDirFilter = PushDirFilter.PARTIAL_PUSH;
}
}
}
}
// pop operands off of the stack
operandStack.clear();
if (isHolisticExpression(call)) {
assert holisticExpression > 0;
holisticExpression--;
if (callPushDirFilter == PushDirFilter.PUSH) {
addResult(call);
}
} else {
popOpStackAndBuildFilter();
}
// push PushDirFilter result for this call onto stack
pushStatusStack.add(callPushDirFilter);
}
public Void visitDynamicParam(RexDynamicParam dynamicParam) {
return pushVariable();
}
public Void visitRangeRef(RexRangeRef rangeRef) {
return pushVariable();
}
public Void visitFieldAccess(RexFieldAccess fieldAccess) {
return pushVariable();
}
}