blob: 34c88a6e117d014db832e203647c454a5ba511fe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner.rewrite;
import com.google.common.collect.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.exception.InvalidQueryException;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.rewrite.FilterPushDownRule.FilterPushDownContext;
import org.apache.tajo.util.TUtil;
import java.util.*;
/**
* This rule tries to push down all filter conditions into logical nodes as lower as possible.
* It is likely to significantly reduces the intermediate data.
*/
public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownContext, LogicalNode>
implements RewriteRule {
private final static Log LOG = LogFactory.getLog(FilterPushDownRule.class);
private static final String NAME = "FilterPushDown";
static class FilterPushDownContext {
Set<EvalNode> pushingDownFilters = new HashSet<EvalNode>();
public void clear() {
pushingDownFilters.clear();
}
public void setFiltersTobePushed(Collection<EvalNode> workingEvals) {
this.pushingDownFilters.clear();
this.pushingDownFilters.addAll(workingEvals);
}
public void addFiltersTobePushed(Collection<EvalNode> workingEvals) {
this.pushingDownFilters.addAll(workingEvals);
}
public void setToOrigin(Map<EvalNode, EvalNode> evalMap) {
//evalMap: copy -> origin
List<EvalNode> origins = new ArrayList<EvalNode>();
for (EvalNode eval : pushingDownFilters) {
EvalNode origin = evalMap.get(eval);
if (origin != null) {
origins.add(origin);
}
}
setFiltersTobePushed(origins);
}
}
@Override
public String getName() {
return NAME;
}
@Override
public boolean isEligible(LogicalPlan plan) {
for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
if (block.hasNode(NodeType.SELECTION) || block.hasNode(NodeType.JOIN)) {
return true;
}
}
return false;
}
@Override
public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
/*
FilterPushDown rule: processing when visits each node
- If a target which is corresponding on a filter EvalNode's column is not FieldEval, do not PushDown.
- Replace filter EvalNode's column with child node's output column.
If there is no child node's output column, do not PushDown.
- When visit ScanNode, add filter eval to ScanNode's qual
- When visit GroupByNode, Find aggregation column in a filter EvalNode and
. If a parent is HavingNode, add filter eval to parent HavingNode.
. It not, create new HavingNode and set parent's child.
*/
FilterPushDownContext context = new FilterPushDownContext();
for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
context.clear();
this.visit(context, plan, block, block.getRoot(), new Stack<LogicalNode>());
}
if (LOG.isDebugEnabled()) {
LOG.debug("=============================================");
LOG.debug("FilterPushDown Optimized Query: \n" + plan.toString());
LOG.debug("=============================================");
}
return plan;
}
@Override
public LogicalNode visitFilter(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
SelectionNode selNode, Stack<LogicalNode> stack) throws PlanningException {
context.pushingDownFilters.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual())));
stack.push(selNode);
visit(context, plan, block, selNode.getChild(), stack);
stack.pop();
if(context.pushingDownFilters.size() == 0) {
// remove the selection operator if there is no search condition after selection push.
LogicalNode node = stack.peek();
if (node instanceof UnaryNode) {
UnaryNode unary = (UnaryNode) node;
unary.setChild(selNode.getChild());
} else {
throw new InvalidQueryException("Unexpected Logical Query Plan");
}
} else { // if there remain search conditions
// check if it can be evaluated here
Set<EvalNode> matched = TUtil.newHashSet();
for (EvalNode eachEval : context.pushingDownFilters) {
if (LogicalPlanner.checkIfBeEvaluatedAtThis(eachEval, selNode)) {
matched.add(eachEval);
}
}
// if there are search conditions which can be evaluated here,
// push down them and remove them from context.pushingDownFilters.
if (matched.size() > 0) {
selNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(matched.toArray(new EvalNode[matched.size()])));
context.pushingDownFilters.removeAll(matched);
}
}
return selNode;
}
@Override
public LogicalNode visitJoin(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode joinNode,
Stack<LogicalNode> stack) throws PlanningException {
// here we should stop selection pushdown on the null supplying side(s) of an outer join
// get the two operands of the join operation as well as the join type
JoinType joinType = joinNode.getJoinType();
EvalNode joinQual = joinNode.getJoinQual();
if (joinQual != null && LogicalPlanner.isOuterJoin(joinType)) {
BinaryEval binaryEval = (BinaryEval) joinQual;
// if both are fields
if (binaryEval.getLeftExpr().getType() == EvalType.FIELD &&
binaryEval.getRightExpr().getType() == EvalType.FIELD) {
String leftTableName = ((FieldEval) binaryEval.getLeftExpr()).getQualifier();
String rightTableName = ((FieldEval) binaryEval.getRightExpr()).getQualifier();
List<String> nullSuppliers = Lists.newArrayList();
Set<String> leftTableSet = Sets.newHashSet(PlannerUtil.getRelationLineageWithinQueryBlock(plan,
joinNode.getLeftChild()));
Set<String> rightTableSet = Sets.newHashSet(PlannerUtil.getRelationLineageWithinQueryBlock(plan,
joinNode.getRightChild()));
// some verification
if (joinType == JoinType.FULL_OUTER) {
nullSuppliers.add(leftTableName);
nullSuppliers.add(rightTableName);
// verify that these null suppliers are indeed in the left and right sets
if (!rightTableSet.contains(nullSuppliers.get(0)) && !leftTableSet.contains(nullSuppliers.get(0))) {
throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
}
if (!rightTableSet.contains(nullSuppliers.get(1)) && !leftTableSet.contains(nullSuppliers.get(1))) {
throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
}
} else if (joinType == JoinType.LEFT_OUTER) {
nullSuppliers.add(((RelationNode)joinNode.getRightChild()).getCanonicalName());
//verify that this null supplier is indeed in the right sub-tree
if (!rightTableSet.contains(nullSuppliers.get(0))) {
throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
}
} else if (joinType == JoinType.RIGHT_OUTER) {
if (((RelationNode)joinNode.getRightChild()).getCanonicalName().equals(rightTableName)) {
nullSuppliers.add(leftTableName);
} else {
nullSuppliers.add(rightTableName);
}
// verify that this null supplier is indeed in the left sub-tree
if (!leftTableSet.contains(nullSuppliers.get(0))) {
throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
}
}
}
}
// get evals from ON clause
List<EvalNode> onConditions = new ArrayList<EvalNode>();
if (joinNode.hasJoinQual()) {
onConditions.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual())));
}
boolean isTopMostJoin = stack.peek().getType() != NodeType.JOIN;
List<EvalNode> outerJoinPredicationEvals = new ArrayList<EvalNode>();
List<EvalNode> outerJoinFilterEvalsExcludePredication = new ArrayList<EvalNode>();
if (LogicalPlanner.isOuterJoin(joinNode.getJoinType())) {
// TAJO-853
// In the case of top most JOIN, all filters except JOIN condition aren't pushed down.
// That filters are processed by SELECTION NODE.
Set<String> nullSupplyingTableNameSet;
if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
nullSupplyingTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getLeftChild()));
} else {
nullSupplyingTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getRightChild()));
}
Set<String> preservedTableNameSet;
if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
preservedTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getRightChild()));
} else {
preservedTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getLeftChild()));
}
List<EvalNode> removedFromFilter = new ArrayList<EvalNode>();
for (EvalNode eachEval: context.pushingDownFilters) {
if (EvalTreeUtil.isJoinQual(eachEval, true)) {
outerJoinPredicationEvals.add(eachEval);
removedFromFilter.add(eachEval);
} else {
Set<Column> columns = EvalTreeUtil.findUniqueColumns(eachEval);
boolean canPushDown = true;
for (Column eachColumn: columns) {
if (nullSupplyingTableNameSet.contains(eachColumn.getQualifier())) {
canPushDown = false;
break;
}
}
if (!canPushDown) {
outerJoinFilterEvalsExcludePredication.add(eachEval);
removedFromFilter.add(eachEval);
}
}
}
context.pushingDownFilters.removeAll(removedFromFilter);
for (EvalNode eachOnEval: onConditions) {
if (EvalTreeUtil.isJoinQual(eachOnEval, true)) {
// If join condition, processing in the JoinNode.
outerJoinPredicationEvals.add(eachOnEval);
} else {
// If Eval has a column which belong to Preserved Row table, not using to push down but using JoinCondition
Set<Column> columns = EvalTreeUtil.findUniqueColumns(eachOnEval);
boolean canPushDown = true;
for (Column eachColumn: columns) {
if (preservedTableNameSet.contains(eachColumn.getQualifier())) {
canPushDown = false;
break;
}
}
if (canPushDown) {
context.pushingDownFilters.add(eachOnEval);
} else {
outerJoinPredicationEvals.add(eachOnEval);
}
}
}
} else {
context.pushingDownFilters.addAll(onConditions);
}
LogicalNode left = joinNode.getLeftChild();
LogicalNode right = joinNode.getRightChild();
List<EvalNode> notMatched = new ArrayList<EvalNode>();
// Join's input schema = right child output columns + left child output columns
Map<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(context, joinNode, left, notMatched, null, true,
0);
context.setFiltersTobePushed(transformedMap.keySet());
visit(context, plan, block, left, stack);
context.setToOrigin(transformedMap);
context.addFiltersTobePushed(notMatched);
notMatched.clear();
transformedMap = findCanPushdownAndTransform(context, joinNode, right, notMatched, null, true, left.getOutSchema().size());
context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
visit(context, plan, block, right, stack);
context.setToOrigin(transformedMap);
context.addFiltersTobePushed(notMatched);
notMatched.clear();
List<EvalNode> matched = Lists.newArrayList();
if(LogicalPlanner.isOuterJoin(joinNode.getJoinType())) {
matched.addAll(outerJoinPredicationEvals);
} else {
for (EvalNode eval : context.pushingDownFilters) {
if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, eval, joinNode, isTopMostJoin)) {
matched.add(eval);
}
}
}
EvalNode qual = null;
if (matched.size() > 1) {
// merged into one eval tree
qual = AlgebraicUtil.createSingletonExprFromCNF(
matched.toArray(new EvalNode[matched.size()]));
} else if (matched.size() == 1) {
// if the number of matched expr is one
qual = matched.get(0);
}
if (qual != null) {
joinNode.setJoinQual(qual);
if (joinNode.getJoinType() == JoinType.CROSS) {
joinNode.setJoinType(JoinType.INNER);
}
context.pushingDownFilters.removeAll(matched);
}
context.pushingDownFilters.addAll(outerJoinFilterEvalsExcludePredication);
return joinNode;
}
private Map<EvalNode, EvalNode> transformEvalsWidthByPassNode(
Collection<EvalNode> originEvals, LogicalPlan plan,
LogicalPlan.QueryBlock block,
LogicalNode node, LogicalNode childNode) throws PlanningException {
// transformed -> pushingDownFilters
Map<EvalNode, EvalNode> transformedMap = new HashMap<EvalNode, EvalNode>();
if (originEvals.isEmpty()) {
return transformedMap;
}
if (node.getType() == NodeType.UNION) {
// If node is union, All eval's columns are simple name and matched with child's output schema.
Schema childOutSchema = childNode.getOutSchema();
for (EvalNode eval : originEvals) {
EvalNode copy;
try {
copy = (EvalNode) eval.clone();
} catch (CloneNotSupportedException e) {
throw new PlanningException(e);
}
Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
for (Column c : columns) {
Column column = childOutSchema.getColumn(c.getSimpleName());
if (column == null) {
throw new PlanningException(
"Invalid Filter PushDown on SubQuery: No such a corresponding column '"
+ c.getQualifiedName() + " for FilterPushDown(" + eval + "), " +
"(PID=" + node.getPID() + ", Child=" + childNode.getPID() + ")");
}
EvalTreeUtil.changeColumnRef(copy, c.getSimpleName(), column.getQualifiedName());
}
transformedMap.put(copy, eval);
}
return transformedMap;
}
if (childNode.getType() == NodeType.UNION) {
// If child is union, remove qualifier from eval's column
for (EvalNode eval : originEvals) {
EvalNode copy;
try {
copy = (EvalNode) eval.clone();
} catch (CloneNotSupportedException e) {
throw new PlanningException(e);
}
Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
for (Column c : columns) {
if (c.hasQualifier()) {
EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), c.getSimpleName());
}
}
transformedMap.put(copy, eval);
}
return transformedMap;
}
// node in column -> child out column
Map<String, String> columnMap = new HashMap<String, String>();
for (int i = 0; i < node.getInSchema().size(); i++) {
String inColumnName = node.getInSchema().getColumn(i).getQualifiedName();
Column childOutColumn = childNode.getOutSchema().getColumn(i);
columnMap.put(inColumnName, childOutColumn.getQualifiedName());
}
// Rename from upper block's one to lower block's one
for (EvalNode matchedEval : originEvals) {
EvalNode copy;
try {
copy = (EvalNode) matchedEval.clone();
} catch (CloneNotSupportedException e) {
throw new PlanningException(e);
}
Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
boolean allMatched = true;
for (Column c : columns) {
if (columnMap.containsKey(c.getQualifiedName())) {
EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), columnMap.get(c.getQualifiedName()));
} else {
if (childNode.getType() == NodeType.GROUP_BY) {
if (((GroupbyNode) childNode).isAggregationColumn(c.getSimpleName())) {
allMatched = false;
break;
}
} else {
throw new PlanningException(
"Invalid Filter PushDown on SubQuery: No such a corresponding column '"
+ c.getQualifiedName() + " for FilterPushDown(" + matchedEval + "), " +
"(PID=" + node.getPID() + ", Child=" + childNode.getPID() + ")"
);
}
}
}
if (allMatched) {
transformedMap.put(copy, matchedEval);
}
}
return transformedMap;
}
@Override
public LogicalNode visitTableSubQuery(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException {
List<EvalNode> matched = Lists.newArrayList();
for (EvalNode eval : context.pushingDownFilters) {
if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, node)) {
matched.add(eval);
}
}
// transformed -> pushingDownFilters
Map<EvalNode, EvalNode> transformedMap =
transformEvalsWidthByPassNode(matched, plan, block, node, node.getSubQuery());
context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
visit(context, plan, plan.getBlock(node.getSubQuery()));
context.setToOrigin(transformedMap);
return node;
}
@Override
public LogicalNode visitUnion(FilterPushDownContext context, LogicalPlan plan,
LogicalPlan.QueryBlock block, UnionNode unionNode,
Stack<LogicalNode> stack) throws PlanningException {
LogicalNode leftNode = unionNode.getLeftChild();
List<EvalNode> origins = new ArrayList<EvalNode>(context.pushingDownFilters);
// transformed -> pushingDownFilters
Map<EvalNode, EvalNode> transformedMap = transformEvalsWidthByPassNode(origins, plan, block, unionNode, leftNode);
context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
visit(context, plan, plan.getBlock(leftNode));
if (!context.pushingDownFilters.isEmpty()) {
errorFilterPushDown(plan, leftNode, context);
}
LogicalNode rightNode = unionNode.getRightChild();
transformedMap = transformEvalsWidthByPassNode(origins, plan, block, unionNode, rightNode);
context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
visit(context, plan, plan.getBlock(rightNode), rightNode, stack);
if (!context.pushingDownFilters.isEmpty()) {
errorFilterPushDown(plan, rightNode, context);
}
// notify all filter matched to upper
context.pushingDownFilters.clear();
return unionNode;
}
@Override
public LogicalNode visitProjection(FilterPushDownContext context,
LogicalPlan plan,
LogicalPlan.QueryBlock block,
ProjectionNode projectionNode,
Stack<LogicalNode> stack) throws PlanningException {
LogicalNode childNode = projectionNode.getChild();
List<EvalNode> notMatched = new ArrayList<EvalNode>();
//copy -> origin
BiMap<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(
context, projectionNode, childNode, notMatched, null, false, 0);
context.setFiltersTobePushed(transformedMap.keySet());
stack.push(projectionNode);
childNode = visit(context, plan, plan.getBlock(childNode), childNode, stack);
stack.pop();
// find not matched after visiting child
for (EvalNode eval: context.pushingDownFilters) {
notMatched.add(transformedMap.get(eval));
}
EvalNode qual = null;
if (notMatched.size() > 1) {
// merged into one eval tree
qual = AlgebraicUtil.createSingletonExprFromCNF(notMatched.toArray(new EvalNode[notMatched.size()]));
} else if (notMatched.size() == 1) {
// if the number of matched expr is one
qual = notMatched.get(0);
}
// If there is not matched node add SelectionNode and clear context.pushingDownFilters
if (qual != null && LogicalPlanner.checkIfBeEvaluatedAtThis(qual, projectionNode)) {
SelectionNode selectionNode = plan.createNode(SelectionNode.class);
selectionNode.setInSchema(childNode.getOutSchema());
selectionNode.setOutSchema(childNode.getOutSchema());
selectionNode.setQual(qual);
block.registerNode(selectionNode);
projectionNode.setChild(selectionNode);
selectionNode.setChild(childNode);
// clean all remain filters because all conditions are merged into a qual
context.pushingDownFilters.clear();
}
// if there are remain filters, recover the original names and give back to the upper query block.
if (context.pushingDownFilters.size() > 0) {
ImmutableSet<EvalNode> copy = ImmutableSet.copyOf(context.pushingDownFilters);
context.pushingDownFilters.clear();
context.pushingDownFilters.addAll(reverseTransform(transformedMap, copy));
}
return projectionNode;
}
private Collection<EvalNode> reverseTransform(BiMap<EvalNode, EvalNode> map, Set<EvalNode> remainFilters) {
Set<EvalNode> reversed = Sets.newHashSet();
for (EvalNode evalNode : remainFilters) {
reversed.add(map.get(evalNode));
}
return reversed;
}
private BiMap<EvalNode, EvalNode> findCanPushdownAndTransform(
FilterPushDownContext context, Projectable node,
LogicalNode childNode, List<EvalNode> notMatched,
Set<String> partitionColumns,
boolean ignoreJoin, int columnOffset) throws PlanningException {
// canonical name -> target
Map<String, Target> nodeTargetMap = new HashMap<String, Target>();
for (Target target : node.getTargets()) {
nodeTargetMap.put(target.getCanonicalName(), target);
}
// copy -> origin
BiMap<EvalNode, EvalNode> matched = HashBiMap.create();
for (EvalNode eval : context.pushingDownFilters) {
if (ignoreJoin && EvalTreeUtil.isJoinQual(eval, true)) {
notMatched.add(eval);
continue;
}
// If all column is field eval, can push down.
Set<Column> evalColumns = EvalTreeUtil.findUniqueColumns(eval);
boolean columnMatched = true;
for (Column c : evalColumns) {
Target target = nodeTargetMap.get(c.getQualifiedName());
if (target == null) {
columnMatched = false;
break;
}
if (target.getEvalTree().getType() != EvalType.FIELD) {
columnMatched = false;
break;
}
}
if (columnMatched) {
// transform eval column to child's output column
EvalNode copyEvalNode = transformEval(node, childNode, eval, nodeTargetMap, partitionColumns, columnOffset);
if (copyEvalNode != null) {
matched.put(copyEvalNode, eval);
} else {
notMatched.add(eval);
}
} else {
notMatched.add(eval);
}
}
return matched;
}
private EvalNode transformEval(Projectable node, LogicalNode childNode, EvalNode origin,
Map<String, Target> targetMap, Set<String> partitionColumns,
int columnOffset) throws PlanningException {
Schema outputSchema = childNode != null ? childNode.getOutSchema() : node.getInSchema();
EvalNode copy;
try {
copy = (EvalNode) origin.clone();
} catch (CloneNotSupportedException e) {
throw new PlanningException(e);
}
Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
for (Column c: columns) {
Target target = targetMap.get(c.getQualifiedName());
if (target == null) {
throw new PlanningException(
"Invalid Filter PushDown: No such a corresponding target '"
+ c.getQualifiedName() + " for FilterPushDown(" + origin + "), " +
"(PID=" + node.getPID() + ")"
);
}
EvalNode targetEvalNode = target.getEvalTree();
if (targetEvalNode.getType() != EvalType.FIELD) {
throw new PlanningException(
"Invalid Filter PushDown: '" + c.getQualifiedName() + "' target is not FieldEval " +
"(PID=" + node.getPID() + ")"
);
}
FieldEval fieldEval = (FieldEval)targetEvalNode;
Column targetInputColumn = fieldEval.getColumnRef();
int index;
if (targetInputColumn.hasQualifier()) {
index = node.getInSchema().getColumnId(targetInputColumn.getQualifiedName());
} else {
index = node.getInSchema().getColumnIdByName(targetInputColumn.getQualifiedName());
}
if (columnOffset > 0) {
index = index - columnOffset;
}
if (index < 0 || index >= outputSchema.size()) {
if (partitionColumns != null && !partitionColumns.isEmpty() && node instanceof ScanNode) {
ScanNode scanNode = (ScanNode)node;
boolean isPartitionColumn = false;
if (CatalogUtil.isFQColumnName(partitionColumns.iterator().next())) {
isPartitionColumn = partitionColumns.contains(
CatalogUtil.buildFQName(scanNode.getTableName(), c.getSimpleName()));
} else {
isPartitionColumn = partitionColumns.contains(c.getSimpleName());
}
if (isPartitionColumn) {
EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(),
scanNode.getCanonicalName() + "." + c.getSimpleName());
} else {
return null;
}
} else {
return null;
}
} else {
Column outputColumn = outputSchema.getColumn(index);
EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), outputColumn.getQualifiedName());
}
}
return copy;
}
/**
* Find aggregation columns in filter eval and add having clause or add HavingNode.
* @param context
* @param plan
* @param block
* @param parentNode If null, having is parent
* @param havingNode If null, projection is parent
* @param groupByNode
* @return matched origin eval
* @throws PlanningException
*/
private List<EvalNode> addHavingNode(FilterPushDownContext context, LogicalPlan plan,
LogicalPlan.QueryBlock block,
UnaryNode parentNode,
HavingNode havingNode,
GroupbyNode groupByNode) throws PlanningException {
// find aggregation column
Set<Column> groupingColumns = new HashSet<Column>(Arrays.asList(groupByNode.getGroupingColumns()));
Set<String> aggrFunctionOutColumns = new HashSet<String>();
for (Column column : groupByNode.getOutSchema().getColumns()) {
if (!groupingColumns.contains(column)) {
aggrFunctionOutColumns.add(column.getQualifiedName());
}
}
List<EvalNode> aggrEvalOrigins = new ArrayList<EvalNode>();
List<EvalNode> aggrEvals = new ArrayList<EvalNode>();
for (EvalNode eval : context.pushingDownFilters) {
EvalNode copy = null;
try {
copy = (EvalNode)eval.clone();
} catch (CloneNotSupportedException e) {
}
boolean isEvalAggrFunction = false;
for (Column evalColumn : EvalTreeUtil.findUniqueColumns(copy)) {
if (aggrFunctionOutColumns.contains(evalColumn.getSimpleName())) {
EvalTreeUtil.changeColumnRef(copy, evalColumn.getQualifiedName(), evalColumn.getSimpleName());
isEvalAggrFunction = true;
break;
}
}
if (isEvalAggrFunction) {
aggrEvals.add(copy);
aggrEvalOrigins.add(eval);
}
}
if (aggrEvals.isEmpty()) {
return aggrEvalOrigins;
}
// transform
HavingNode workingHavingNode;
if (havingNode != null) {
workingHavingNode = havingNode;
aggrEvals.add(havingNode.getQual());
} else {
workingHavingNode = plan.createNode(HavingNode.class);
block.registerNode(workingHavingNode);
parentNode.setChild(workingHavingNode);
workingHavingNode.setChild(groupByNode);
}
EvalNode qual = null;
if (aggrEvals.size() > 1) {
// merged into one eval tree
qual = AlgebraicUtil.createSingletonExprFromCNF(aggrEvals.toArray(new EvalNode[aggrEvals.size()]));
} else if (aggrEvals.size() == 1) {
// if the number of matched expr is one
qual = aggrEvals.get(0);
}
// If there is not matched node add SelectionNode and clear context.pushingDownFilters
if (qual != null) {
workingHavingNode.setQual(qual);
}
return aggrEvalOrigins;
}
@Override
public LogicalNode visitWindowAgg(FilterPushDownContext context, LogicalPlan plan,
LogicalPlan.QueryBlock block, WindowAggNode winAggNode,
Stack<LogicalNode> stack) throws PlanningException {
stack.push(winAggNode);
super.visitWindowAgg(context, plan, block, winAggNode, stack);
stack.pop();
return winAggNode;
}
@Override
public LogicalNode visitGroupBy(FilterPushDownContext context, LogicalPlan plan,
LogicalPlan.QueryBlock block, GroupbyNode groupbyNode,
Stack<LogicalNode> stack) throws PlanningException {
LogicalNode parentNode = stack.peek();
List<EvalNode> aggrEvals;
if (parentNode.getType() == NodeType.HAVING) {
aggrEvals = addHavingNode(context, plan, block, null, (HavingNode)parentNode, groupbyNode);
} else {
aggrEvals = addHavingNode(context, plan, block, (UnaryNode)parentNode, null, groupbyNode);
}
if (aggrEvals != null) {
// remove aggregation eval from conext
context.pushingDownFilters.removeAll(aggrEvals);
}
List<EvalNode> notMatched = new ArrayList<EvalNode>();
// transform
Map<EvalNode, EvalNode> tranformed =
findCanPushdownAndTransform(context, groupbyNode,groupbyNode.getChild(), notMatched, null, false, 0);
context.setFiltersTobePushed(tranformed.keySet());
LogicalNode current = super.visitGroupBy(context, plan, block, groupbyNode, stack);
context.setToOrigin(tranformed);
context.addFiltersTobePushed(notMatched);
return current;
}
@Override
public LogicalNode visitScan(FilterPushDownContext context, LogicalPlan plan,
LogicalPlan.QueryBlock block, ScanNode scanNode,
Stack<LogicalNode> stack) throws PlanningException {
List<EvalNode> matched = Lists.newArrayList();
// find partition column and check matching
Set<String> partitionColumns = new HashSet<String>();
TableDesc table = scanNode.getTableDesc();
boolean hasQualifiedName = false;
if (table.hasPartition()) {
for (Column c: table.getPartitionMethod().getExpressionSchema().getColumns()) {
partitionColumns.add(c.getQualifiedName());
hasQualifiedName = c.hasQualifier();
}
}
Set<EvalNode> partitionEvals = new HashSet<EvalNode>();
for (EvalNode eval : context.pushingDownFilters) {
if (table.hasPartition()) {
Set<Column> columns = EvalTreeUtil.findUniqueColumns(eval);
if (columns.size() != 1) {
continue;
}
Column column = columns.iterator().next();
// If catalog runs with HCatalog, partition column is a qualified name
// Else partition column is a simple name
boolean isPartitionColumn = false;
if (hasQualifiedName) {
isPartitionColumn = partitionColumns.contains(CatalogUtil.buildFQName(table.getName(), column.getSimpleName()));
} else {
isPartitionColumn = partitionColumns.contains(column.getSimpleName());
}
if (isPartitionColumn) {
EvalNode copy;
try {
copy = (EvalNode) eval.clone();
} catch (CloneNotSupportedException e) {
throw new PlanningException(e);
}
EvalTreeUtil.changeColumnRef(copy, column.getQualifiedName(),
scanNode.getCanonicalName() + "." + column.getSimpleName());
matched.add(copy);
partitionEvals.add(eval);
}
}
}
context.pushingDownFilters.removeAll(partitionEvals);
List<EvalNode> notMatched = new ArrayList<EvalNode>();
// transform
Map<EvalNode, EvalNode> transformed =
findCanPushdownAndTransform(context, scanNode, null, notMatched, partitionColumns, true, 0);
for (EvalNode eval : transformed.keySet()) {
if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, scanNode)) {
matched.add(eval);
}
}
EvalNode qual = null;
if (matched.size() > 1) {
// merged into one eval tree
qual = AlgebraicUtil.createSingletonExprFromCNF(
matched.toArray(new EvalNode[matched.size()]));
} else if (matched.size() == 1) {
// if the number of matched expr is one
qual = matched.iterator().next();
}
if (qual != null) { // if a matched qual exists
scanNode.setQual(qual);
}
for (EvalNode matchedEval: matched) {
transformed.remove(matchedEval);
}
context.setToOrigin(transformed);
context.addFiltersTobePushed(notMatched);
return scanNode;
}
private void errorFilterPushDown(LogicalPlan plan, LogicalNode node,
FilterPushDownContext context) throws PlanningException {
String notMatchedNodeStr = "";
String prefix = "";
for (EvalNode notMatchedNode: context.pushingDownFilters) {
notMatchedNodeStr += prefix + notMatchedNode;
prefix = ", ";
}
throw new PlanningException("FilterPushDown failed cause some filters not matched: " + notMatchedNodeStr + "\n" +
"Error node: " + node.getPlanString() + "\n" +
plan.toString());
}
}