| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.engine.planner.rewrite; |
| |
| import com.google.common.collect.*; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.tajo.annotation.Nullable; |
| import org.apache.tajo.catalog.Column; |
| import org.apache.tajo.catalog.Schema; |
| import org.apache.tajo.catalog.SortSpec; |
| import org.apache.tajo.engine.eval.*; |
| import org.apache.tajo.engine.planner.*; |
| import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock; |
| import org.apache.tajo.engine.planner.logical.*; |
| import org.apache.tajo.engine.utils.SchemaUtil; |
| import org.apache.tajo.util.TUtil; |
| |
| import java.util.*; |
| |
| /** |
| * ProjectionPushDownRule deploys expressions in a selection list to proper |
| * {@link org.apache.tajo.engine.planner.logical.Projectable} |
| * nodes. In this process, the expressions are usually pushed down into as lower as possible. |
| * It also enables scanners to read only necessary columns. |
| */ |
| public class ProjectionPushDownRule extends |
| BasicLogicalPlanVisitor<ProjectionPushDownRule.Context, LogicalNode> implements RewriteRule { |
| /** Class Logger */ |
| private final Log LOG = LogFactory.getLog(ProjectionPushDownRule.class); |
| private static final String name = "ProjectionPushDown"; |
| |
| @Override |
| public String getName() { |
| return name; |
| } |
| |
| @Override |
| public boolean isEligible(LogicalPlan plan) { |
| LogicalNode toBeOptimized = plan.getRootBlock().getRoot(); |
| |
| if (PlannerUtil.checkIfDDLPlan(toBeOptimized)) { |
| return false; |
| } |
| for (QueryBlock eachBlock: plan.getQueryBlocks()) { |
| if (eachBlock.hasTableExpression()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException { |
| LogicalPlan.QueryBlock rootBlock = plan.getRootBlock(); |
| |
| LogicalPlan.QueryBlock topmostBlock = rootBlock; |
| |
| Stack<LogicalNode> stack = new Stack<LogicalNode>(); |
| Context context = new Context(plan); |
| visit(context, plan, topmostBlock, topmostBlock.getRoot(), stack); |
| |
| return plan; |
| } |
| |
| /** |
| * <h2>What is TargetListManager?</h2> |
| * It manages all expressions used in a query block, and their reference names. |
| * TargetListManager provides a way to find an expression by a reference name. |
| * It keeps a set of expressions, and one or more reference names can point to |
| * the same expression. |
| * |
| * Also, TargetListManager keeps the evaluation state of each expression. |
| * The evaluation state is a boolean state to indicate whether the expression |
| * was evaluated in descendant node or not. If an expression is evaluated, |
| * the evaluation state is changed to TRUE. It also means that |
| * the expression can be referred by an column reference instead of evaluating the expression. |
| * |
| * Consider an example query: |
| * |
| * SELECT sum(l_orderkey + 1) from lineitem where l_partkey > 1; |
| * |
| * In this case, an expression sum(l_orderkey + 1) is divided into two sub expressions: |
| * <ul> |
| * <li>$1 <- l_orderkey + 1</li> |
| * <li>$2 <- sum($1)</li> |
| * </ul> |
| * |
| * <code>$1</code> is a simple arithmetic operation, and $2 is an aggregation function. |
| * <code>$1</code> is evaluated in ScanNode because it's just a simple arithmetic operation. |
| * So, the evaluation state of l_orderkey + 1 initially |
| * is false, but the state will be true after ScanNode. |
| * |
| * In contrast, sum($1) is evaluated at GroupbyNode. So, its evaluation state is changed |
| * after GroupByNode. |
| * |
| * <h2>Why is TargetListManager necessary?</h2> |
| * |
| * Expressions used in a query block can be divided into various categories according to |
| * the possible {@link Projectable} nodes. Their references become available depending on |
| * the Projectable node at which expressions are evaluated. It manages the expressions and |
| * references for optimized places of expressions. It performs duplicated removal and enables |
| * common expressions to be shared with two or more Projectable nodes. It also helps Projectable |
| * nodes to find correct column references. |
| */ |
| public static class TargetListManager { |
| private Integer seqId = 0; |
| |
| /** |
| * Why should we use LinkedHashMap for those maps ? |
| * |
| * These maps are mainly by the target list of each projectable node |
| * (i.e., ProjectionNode, GroupbyNode, JoinNode, and ScanNode). |
| * The projection node removal occurs only when the projection node's output |
| * schema and its child's output schema are equivalent to each other. |
| * |
| * If we keep the inserted order of all expressions. It would make the possibility |
| * of projection node removal higher. |
| **/ |
| |
| /** A Map: Name -> Id */ |
| private LinkedHashMap<String, Integer> nameToIdBiMap; |
| /** Map: Id <-> EvalNode */ |
| private BiMap<Integer, EvalNode> idToEvalBiMap; |
| /** Map: Id -> Names */ |
| private LinkedHashMap<Integer, List<String>> idToNamesMap; |
| /** Map: Id -> Boolean */ |
| private LinkedHashMap<Integer, Boolean> evaluationStateMap; |
| /** Map: alias name -> Id */ |
| private LinkedHashMap<String, Integer> aliasMap; |
| |
| private LogicalPlan plan; |
| |
| public TargetListManager(LogicalPlan plan) { |
| this.plan = plan; |
| nameToIdBiMap = Maps.newLinkedHashMap(); |
| idToEvalBiMap = HashBiMap.create(); |
| idToNamesMap = Maps.newLinkedHashMap(); |
| evaluationStateMap = Maps.newLinkedHashMap(); |
| aliasMap = Maps.newLinkedHashMap(); |
| } |
| |
| private int getNextSeqId() { |
| return seqId++; |
| } |
| |
| /** |
| * If some expression is duplicated, we call an alias indicating the duplicated expression 'native alias'. |
| * This method checks whether a reference is native alias or not. |
| * |
| * @param name The reference name |
| * @return True if the reference is native alias. Otherwise, it will return False. |
| */ |
| public boolean isNativeAlias(String name) { |
| return aliasMap.containsKey(name); |
| } |
| |
| /** |
| * This method retrieves the name indicating actual expression that an given alias indicate. |
| * |
| * @param name an alias name |
| * @return Real reference name |
| */ |
| public String getRealReferenceName(String name) { |
| int refId = aliasMap.get(name); |
| return getPrimaryName(refId); |
| } |
| |
| /** |
| * Add an expression with a specified name, which is usually an alias. |
| * Later, you can refer this expression by the specified name. |
| */ |
| private String add(String specifiedName, EvalNode evalNode) throws PlanningException { |
| |
| // if a name already exists, it only just keeps an actual |
| // expression instead of a column reference. |
| if (nameToIdBiMap.containsKey(specifiedName)) { |
| int refId = nameToIdBiMap.get(specifiedName); |
| EvalNode found = idToEvalBiMap.get(refId); |
| if (found != null) { |
| if (evalNode.equals(found)) { // if input expression already exists |
| return specifiedName; |
| } else { |
| // The case where if existing reference name and a given reference name are the same to each other and |
| // existing EvalNode and a given EvalNode is the different |
| if (found.getType() != EvalType.FIELD && evalNode.getType() != EvalType.FIELD) { |
| throw new PlanningException("Duplicate alias: " + evalNode); |
| } |
| |
| if (found.getType() == EvalType.FIELD) { |
| idToEvalBiMap.forcePut(refId, evalNode); |
| } |
| } |
| } |
| } |
| |
| int refId; |
| if (idToEvalBiMap.inverse().containsKey(evalNode)) { |
| refId = idToEvalBiMap.inverse().get(evalNode); |
| aliasMap.put(specifiedName, refId); |
| |
| } else { |
| refId = getNextSeqId(); |
| idToEvalBiMap.put(refId, evalNode); |
| TUtil.putToNestedList(idToNamesMap, refId, specifiedName); |
| for (Column column : EvalTreeUtil.findUniqueColumns(evalNode)) { |
| add(new FieldEval(column)); |
| } |
| evaluationStateMap.put(refId, false); |
| } |
| |
| nameToIdBiMap.put(specifiedName, refId); |
| |
| return specifiedName; |
| } |
| |
| /** |
| * Adds an expression without any name. It returns an automatically |
| * generated name. It can be also used for referring this expression. |
| */ |
| public String add(EvalNode evalNode) throws PlanningException { |
| String name; |
| |
| if (evalNode.getType() == EvalType.FIELD) { |
| FieldEval fieldEval = (FieldEval) evalNode; |
| if (nameToIdBiMap.containsKey(fieldEval.getName())) { |
| int refId = nameToIdBiMap.get(fieldEval.getName()); |
| return getPrimaryName(refId); |
| } |
| } |
| |
| if (idToEvalBiMap.inverse().containsKey(evalNode)) { |
| int refId = idToEvalBiMap.inverse().get(evalNode); |
| return getPrimaryName(refId); |
| } |
| |
| if (evalNode.getType() == EvalType.FIELD) { |
| FieldEval fieldEval = (FieldEval) evalNode; |
| name = fieldEval.getName(); |
| } else { |
| name = plan.generateUniqueColumnName(evalNode); |
| } |
| |
| return add(name, evalNode); |
| } |
| |
| public Collection<String> getNames() { |
| return nameToIdBiMap.keySet(); |
| } |
| |
| public String add(Target target) throws PlanningException { |
| return add(target.getCanonicalName(), target.getEvalTree()); |
| } |
| |
| /** |
| * Each expression can have one or more names. |
| * We call a name added with an expression firstly as the primary name. |
| * It has a special meaning. Since duplicated expression in logical planning are removed, |
| * the primary name is only used for each expression during logical planning. |
| * |
| * @param refId The identifier of an expression |
| * @param name The name to check if it is the primary name. |
| * @return True if this name is the primary added name. Otherwise, False. |
| */ |
| private boolean isPrimaryName(int refId, String name) { |
| if (idToNamesMap.get(refId).size() > 0) { |
| return getPrimaryName(refId).equals(name); |
| } else { |
| return false; |
| } |
| } |
| |
| private String getPrimaryName(int refId) { |
| return idToNamesMap.get(refId).get(0); |
| } |
| |
| public Target getTarget(String name) { |
| if (!nameToIdBiMap.containsKey(name)) { |
| throw new RuntimeException("No Such target name: " + name); |
| } |
| int id = nameToIdBiMap.get(name); |
| EvalNode evalNode = idToEvalBiMap.get(id); |
| |
| // if it is a constant value, just returns a constant because it can be evaluated everywhere. |
| if (evalNode.getType() == EvalType.CONST) { |
| return new Target(evalNode, name); |
| } |
| |
| // if a name is not the primary name, it means that its expression may be already evaluated and |
| // can just refer a value. Consider an example as follows: |
| // |
| // select l_orderkey + 1 as total1, l_orderkey + 1 as total2 from lineitem |
| // |
| // In this case, total2 will meet the following condition. Then, total2 can |
| // just refer the result of total1 rather than calculating l_orderkey + 1. |
| if (!isPrimaryName(id, name) && isEvaluated(getPrimaryName(id))) { |
| evalNode = new FieldEval(getPrimaryName(id), evalNode.getValueType()); |
| } |
| |
| // if it is a column reference itself, just returns a column reference without any alias. |
| if (evalNode.getType() == EvalType.FIELD && evalNode.getName().equals(name)) { |
| return new Target((FieldEval)evalNode); |
| } else { // otherwise, it returns an expression. |
| return new Target(evalNode, name); |
| } |
| } |
| |
| public boolean isEvaluated(String name) { |
| if (!nameToIdBiMap.containsKey(name)) { |
| throw new RuntimeException("No Such target name: " + name); |
| } |
| int refId = nameToIdBiMap.get(name); |
| return evaluationStateMap.get(refId); |
| } |
| |
| public void markAsEvaluated(Target target) { |
| int refId = nameToIdBiMap.get(target.getCanonicalName()); |
| EvalNode evalNode = target.getEvalTree(); |
| if (!idToNamesMap.containsKey(refId)) { |
| throw new RuntimeException("No such eval: " + evalNode); |
| } |
| evaluationStateMap.put(refId, true); |
| } |
| |
| public Iterator<Target> getFilteredTargets(Set<String> required) { |
| return new FilteredTargetIterator(required); |
| } |
| |
| class FilteredTargetIterator implements Iterator<Target> { |
| List<Target> filtered = TUtil.newList(); |
| Iterator<Target> iterator; |
| |
| public FilteredTargetIterator(Set<String> required) { |
| for (String name : nameToIdBiMap.keySet()) { |
| if (required.contains(name)) { |
| filtered.add(getTarget(name)); |
| } |
| } |
| iterator = filtered.iterator(); |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return iterator.hasNext(); |
| } |
| |
| @Override |
| public Target next() { |
| return iterator.next(); |
| } |
| |
| @Override |
| public void remove() { |
| } |
| } |
| |
| public String toString() { |
| int evaluated = 0; |
| for (Boolean flag: evaluationStateMap.values()) { |
| if (flag) { |
| evaluated++; |
| } |
| } |
| return "eval=" + evaluationStateMap.size() + ", evaluated=" + evaluated; |
| } |
| } |
| |
| static class Context { |
| TargetListManager targetListMgr; |
| Set<String> requiredSet; |
| |
| public Context(LogicalPlan plan) { |
| requiredSet = new LinkedHashSet<String>(); |
| targetListMgr = new TargetListManager(plan); |
| } |
| |
| public Context(LogicalPlan plan, Collection<String> requiredSet) { |
| this.requiredSet = new LinkedHashSet<String>(requiredSet); |
| targetListMgr = new TargetListManager(plan); |
| } |
| |
| public Context(Context upperContext) { |
| this.requiredSet = new LinkedHashSet<String>(upperContext.requiredSet); |
| targetListMgr = upperContext.targetListMgr; |
| } |
| |
| public String addExpr(Target target) throws PlanningException { |
| String reference = targetListMgr.add(target); |
| addNecessaryReferences(target.getEvalTree()); |
| return reference; |
| } |
| |
| public String addExpr(EvalNode evalNode) throws PlanningException { |
| String reference = targetListMgr.add(evalNode); |
| addNecessaryReferences(evalNode); |
| return reference; |
| } |
| |
| private void addNecessaryReferences(EvalNode evalNode) { |
| for (Column column : EvalTreeUtil.findUniqueColumns(evalNode)) { |
| requiredSet.add(column.getQualifiedName()); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "required=" + requiredSet.size() + "," + targetListMgr.toString(); |
| } |
| } |
| |
| @Override |
| public LogicalNode visitRoot(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, LogicalRootNode node, |
| Stack<LogicalNode> stack) throws PlanningException { |
| LogicalNode child = super.visitRoot(context, plan, block, node, stack); |
| node.setInSchema(child.getOutSchema()); |
| node.setOutSchema(child.getOutSchema()); |
| return node; |
| } |
| |
| @Override |
| public LogicalNode visitProjection(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, |
| ProjectionNode node, Stack<LogicalNode> stack) throws PlanningException { |
| Context newContext = new Context(context); |
| Target [] targets = node.getTargets(); |
| int targetNum = targets.length; |
| String [] referenceNames = new String[targetNum]; |
| for (int i = 0; i < targetNum; i++) { |
| referenceNames[i] = newContext.addExpr(targets[i]); |
| } |
| |
| LogicalNode child = super.visitProjection(newContext, plan, block, node, stack); |
| |
| node.setInSchema(child.getOutSchema()); |
| |
| int evaluationCount = 0; |
| List<Target> finalTargets = TUtil.newList(); |
| for (String referenceName : referenceNames) { |
| Target target = context.targetListMgr.getTarget(referenceName); |
| |
| if (target.getEvalTree().getType() == EvalType.CONST) { |
| finalTargets.add(target); |
| } else if (context.targetListMgr.isEvaluated(referenceName)) { |
| if (context.targetListMgr.isNativeAlias(referenceName)) { |
| String realRefName = context.targetListMgr.getRealReferenceName(referenceName); |
| finalTargets.add(new Target(new FieldEval(realRefName, target.getDataType()), referenceName)); |
| } else { |
| finalTargets.add(new Target(new FieldEval(target.getNamedColumn()))); |
| } |
| } else if (LogicalPlanner.checkIfBeEvaluatedAtThis(target.getEvalTree(), node)) { |
| finalTargets.add(target); |
| context.targetListMgr.markAsEvaluated(target); |
| evaluationCount++; |
| } |
| } |
| |
| node.setTargets(finalTargets.toArray(new Target[finalTargets.size()])); |
| LogicalPlanner.verifyProjectedFields(block, node); |
| |
| // Removing ProjectionNode |
| // TODO - Consider INSERT and CTAS statement, and then remove the check of stack.empty. |
| if (evaluationCount == 0 && PlannerUtil.targetToSchema(finalTargets).equals(child.getOutSchema())) { |
| if (stack.empty()) { |
| // if it is topmost, set it as the root of this block. |
| block.setRoot(child); |
| } else { |
| LogicalNode parentNode = stack.peek(); |
| switch (parentNode.getType()) { |
| case ROOT: |
| LogicalRootNode rootNode = (LogicalRootNode) parentNode; |
| rootNode.setChild(child); |
| rootNode.setInSchema(child.getOutSchema()); |
| rootNode.setOutSchema(child.getOutSchema()); |
| break; |
| case TABLE_SUBQUERY: |
| TableSubQueryNode tableSubQueryNode = (TableSubQueryNode) parentNode; |
| tableSubQueryNode.setSubQuery(child); |
| break; |
| case STORE: |
| StoreTableNode storeTableNode = (StoreTableNode) parentNode; |
| storeTableNode.setChild(child); |
| storeTableNode.setInSchema(child.getOutSchema()); |
| break; |
| case INSERT: |
| InsertNode insertNode = (InsertNode) parentNode; |
| insertNode.setSubQuery(child); |
| break; |
| case CREATE_TABLE: |
| CreateTableNode createTableNode = (CreateTableNode) parentNode; |
| createTableNode.setChild(child); |
| createTableNode.setInSchema(child.getOutSchema()); |
| break; |
| default: |
| throw new PlanningException("Unexpected Parent Node: " + parentNode.getType()); |
| } |
| plan.addHistory("ProjectionNode is eliminated."); |
| } |
| |
| return child; |
| |
| } else { |
| return node; |
| } |
| } |
| |
| public LogicalNode visitLimit(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, LimitNode node, |
| Stack<LogicalNode> stack) throws PlanningException { |
| LogicalNode child = super.visitLimit(context, plan, block, node, stack); |
| |
| node.setInSchema(child.getOutSchema()); |
| node.setOutSchema(child.getOutSchema()); |
| return node; |
| } |
| |
| @Override |
| public LogicalNode visitSort(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, |
| SortNode node, Stack<LogicalNode> stack) throws PlanningException { |
| Context newContext = new Context(context); |
| |
| final int sortKeyNum = node.getSortKeys().length; |
| String [] keyNames = new String[sortKeyNum]; |
| for (int i = 0; i < sortKeyNum; i++) { |
| SortSpec sortSpec = node.getSortKeys()[i]; |
| keyNames[i] = newContext.addExpr(new FieldEval(sortSpec.getSortKey())); |
| } |
| |
| LogicalNode child = super.visitSort(newContext, plan, block, node, stack); |
| |
| // it rewrite sortkeys. This rewrite sets right column names and eliminates duplicated sort keys. |
| List<SortSpec> sortSpecs = new ArrayList<SortSpec>(); |
| for (int i = 0; i < keyNames.length; i++) { |
| String sortKey = keyNames[i]; |
| Target target = context.targetListMgr.getTarget(sortKey); |
| if (context.targetListMgr.isEvaluated(sortKey)) { |
| Column c = target.getNamedColumn(); |
| SortSpec sortSpec = new SortSpec(c, node.getSortKeys()[i].isAscending(), node.getSortKeys()[i].isNullFirst()); |
| if (!sortSpecs.contains(sortSpec)) { |
| sortSpecs.add(sortSpec); |
| } |
| } else { |
| if (target.getEvalTree().getType() == EvalType.FIELD) { |
| Column c = ((FieldEval)target.getEvalTree()).getColumnRef(); |
| SortSpec sortSpec = new SortSpec(c, node.getSortKeys()[i].isAscending(), node.getSortKeys()[i].isNullFirst()); |
| if (!sortSpecs.contains(sortSpec)) { |
| sortSpecs.add(sortSpec); |
| } |
| } |
| } |
| } |
| node.setSortSpecs(sortSpecs.toArray(new SortSpec[sortSpecs.size()])); |
| |
| node.setInSchema(child.getOutSchema()); |
| node.setOutSchema(child.getOutSchema()); |
| return node; |
| } |
| |
| @Override |
| public LogicalNode visitHaving(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, HavingNode node, |
| Stack<LogicalNode> stack) throws PlanningException { |
| Context newContext = new Context(context); |
| String referenceName = newContext.targetListMgr.add(node.getQual()); |
| newContext.addNecessaryReferences(node.getQual()); |
| |
| LogicalNode child = super.visitHaving(newContext, plan, block, node, stack); |
| |
| node.setInSchema(child.getOutSchema()); |
| node.setOutSchema(child.getOutSchema()); |
| |
| Target target = context.targetListMgr.getTarget(referenceName); |
| if (newContext.targetListMgr.isEvaluated(referenceName)) { |
| node.setQual(new FieldEval(target.getNamedColumn())); |
| } else { |
| node.setQual(target.getEvalTree()); |
| newContext.targetListMgr.markAsEvaluated(target); |
| } |
| |
| return node; |
| } |
| |
| public LogicalNode visitWindowAgg(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode node, |
| Stack<LogicalNode> stack) throws PlanningException { |
| Context newContext = new Context(context); |
| |
| if (node.hasPartitionKeys()) { |
| for (Column c : node.getPartitionKeys()) { |
| newContext.addNecessaryReferences(new FieldEval(c)); |
| } |
| } |
| |
| if (node.hasSortSpecs()) { |
| for (SortSpec sortSpec : node.getSortSpecs()) { |
| newContext.addNecessaryReferences(new FieldEval(sortSpec.getSortKey())); |
| } |
| } |
| |
| for (WindowFunctionEval winFunc : node.getWindowFunctions()) { |
| if (winFunc.hasSortSpecs()) { |
| for (SortSpec sortSpec : winFunc.getSortSpecs()) { |
| newContext.addNecessaryReferences(new FieldEval(sortSpec.getSortKey())); |
| } |
| } |
| } |
| |
| |
| int nonFunctionColumnNum = node.getTargets().length - node.getWindowFunctions().length; |
| LinkedHashSet<String> nonFunctionColumns = Sets.newLinkedHashSet(); |
| for (int i = 0; i < nonFunctionColumnNum; i++) { |
| FieldEval fieldEval = (new FieldEval(node.getTargets()[i].getNamedColumn())); |
| nonFunctionColumns.add(newContext.addExpr(fieldEval)); |
| } |
| |
| final String [] aggEvalNames; |
| if (node.hasAggFunctions()) { |
| final int evalNum = node.getWindowFunctions().length; |
| aggEvalNames = new String[evalNum]; |
| for (int evalIdx = 0, targetIdx = nonFunctionColumnNum; targetIdx < node.getTargets().length; evalIdx++, |
| targetIdx++) { |
| Target target = node.getTargets()[targetIdx]; |
| WindowFunctionEval winFunc = node.getWindowFunctions()[evalIdx]; |
| aggEvalNames[evalIdx] = newContext.addExpr(new Target(winFunc, target.getCanonicalName())); |
| } |
| } else { |
| aggEvalNames = null; |
| } |
| |
| // visit a child node |
| LogicalNode child = super.visitWindowAgg(newContext, plan, block, node, stack); |
| |
| node.setInSchema(child.getOutSchema()); |
| |
| List<Target> targets = Lists.newArrayList(); |
| if (nonFunctionColumnNum > 0) { |
| for (String column : nonFunctionColumns) { |
| Target target = context.targetListMgr.getTarget(column); |
| |
| // it rewrite grouping keys. |
| // This rewrite sets right column names and eliminates duplicated grouping keys. |
| if (context.targetListMgr.isEvaluated(column)) { |
| targets.add(new Target(new FieldEval(target.getNamedColumn()))); |
| } else { |
| if (target.getEvalTree().getType() == EvalType.FIELD) { |
| targets.add(target); |
| } |
| } |
| } |
| } |
| |
| // Getting projected targets |
| if (node.hasAggFunctions() && aggEvalNames != null) { |
| WindowFunctionEval [] aggEvals = new WindowFunctionEval[aggEvalNames.length]; |
| int i = 0; |
| for (Iterator<String> it = getFilteredReferences(aggEvalNames, TUtil.newList(aggEvalNames)); it.hasNext();) { |
| |
| String referenceName = it.next(); |
| Target target = context.targetListMgr.getTarget(referenceName); |
| |
| if (LogicalPlanner.checkIfBeEvaluatedAtWindowAgg(target.getEvalTree(), node)) { |
| aggEvals[i++] = target.getEvalTree(); |
| context.targetListMgr.markAsEvaluated(target); |
| |
| targets.add(new Target(new FieldEval(target.getNamedColumn()))); |
| } |
| } |
| if (aggEvals.length > 0) { |
| node.setWindowFunctions(aggEvals); |
| } |
| } |
| |
| node.setTargets(targets.toArray(new Target[targets.size()])); |
| return node; |
| } |
| |
| public LogicalNode visitGroupBy(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node, |
| Stack<LogicalNode> stack) throws PlanningException { |
| Context newContext = new Context(context); |
| |
| // Getting grouping key names |
| final int groupingKeyNum = node.getGroupingColumns().length; |
| LinkedHashSet<String> groupingKeyNames = null; |
| if (groupingKeyNum > 0) { |
| groupingKeyNames = Sets.newLinkedHashSet(); |
| for (int i = 0; i < groupingKeyNum; i++) { |
| FieldEval fieldEval = new FieldEval(node.getGroupingColumns()[i]); |
| groupingKeyNames.add(newContext.addExpr(fieldEval)); |
| } |
| } |
| |
| // Getting eval names |
| |
| final String [] aggEvalNames; |
| if (node.hasAggFunctions()) { |
| final int evalNum = node.getAggFunctions().length; |
| aggEvalNames = new String[evalNum]; |
| for (int evalIdx = 0, targetIdx = groupingKeyNum; targetIdx < node.getTargets().length; evalIdx++, targetIdx++) { |
| Target target = node.getTargets()[targetIdx]; |
| EvalNode evalNode = node.getAggFunctions()[evalIdx]; |
| aggEvalNames[evalIdx] = newContext.addExpr(new Target(evalNode, target.getCanonicalName())); |
| } |
| } else { |
| aggEvalNames = null; |
| } |
| |
| // visit a child node |
| LogicalNode child = super.visitGroupBy(newContext, plan, block, node, stack); |
| |
| node.setInSchema(child.getOutSchema()); |
| |
| List<Target> targets = Lists.newArrayList(); |
| if (groupingKeyNum > 0 && groupingKeyNames != null) { |
| // Restoring grouping key columns |
| final List<Column> groupingColumns = new ArrayList<Column>(); |
| for (String groupingKey : groupingKeyNames) { |
| Target target = context.targetListMgr.getTarget(groupingKey); |
| |
| // it rewrite grouping keys. |
| // This rewrite sets right column names and eliminates duplicated grouping keys. |
| if (context.targetListMgr.isEvaluated(groupingKey)) { |
| Column c = target.getNamedColumn(); |
| if (!groupingColumns.contains(c)) { |
| groupingColumns.add(c); |
| targets.add(new Target(new FieldEval(target.getNamedColumn()))); |
| } |
| } else { |
| if (target.getEvalTree().getType() == EvalType.FIELD) { |
| Column c = ((FieldEval)target.getEvalTree()).getColumnRef(); |
| if (!groupingColumns.contains(c)) { |
| groupingColumns.add(c); |
| targets.add(target); |
| context.targetListMgr.markAsEvaluated(target); |
| } |
| } else { |
| throw new PlanningException("Cannot evaluate this expression in grouping keys: " + target.getEvalTree()); |
| } |
| } |
| } |
| |
| node.setGroupingColumns(groupingColumns.toArray(new Column[groupingColumns.size()])); |
| } |
| |
| // Getting projected targets |
| if (node.hasAggFunctions() && aggEvalNames != null) { |
| AggregationFunctionCallEval [] aggEvals = new AggregationFunctionCallEval[aggEvalNames.length]; |
| int i = 0; |
| for (Iterator<String> it = getFilteredReferences(aggEvalNames, TUtil.newList(aggEvalNames)); it.hasNext();) { |
| |
| String referenceName = it.next(); |
| Target target = context.targetListMgr.getTarget(referenceName); |
| |
| if (LogicalPlanner.checkIfBeEvaluatedAtGroupBy(target.getEvalTree(), node)) { |
| aggEvals[i++] = target.getEvalTree(); |
| context.targetListMgr.markAsEvaluated(target); |
| } |
| } |
| if (aggEvals.length > 0) { |
| node.setAggFunctions(aggEvals); |
| } |
| } |
| Target [] finalTargets = buildGroupByTarget(node, targets, aggEvalNames); |
| node.setTargets(finalTargets); |
| |
| LogicalPlanner.verifyProjectedFields(block, node); |
| |
| return node; |
| } |
| |
| public static Target [] buildGroupByTarget(GroupbyNode groupbyNode, @Nullable List<Target> groupingKeyTargets, |
| String [] aggEvalNames) { |
| final int groupingKeyNum = |
| groupingKeyTargets == null ? groupbyNode.getGroupingColumns().length : groupingKeyTargets.size(); |
| final int aggrFuncNum = aggEvalNames != null ? aggEvalNames.length : 0; |
| EvalNode [] aggEvalNodes = groupbyNode.getAggFunctions(); |
| Target [] targets = new Target[groupingKeyNum + aggrFuncNum]; |
| |
| if (groupingKeyTargets != null) { |
| for (int groupingKeyIdx = 0; groupingKeyIdx < groupingKeyNum; groupingKeyIdx++) { |
| targets[groupingKeyIdx] = groupingKeyTargets.get(groupingKeyIdx); |
| } |
| } else { |
| for (int groupingKeyIdx = 0; groupingKeyIdx < groupingKeyNum; groupingKeyIdx++) { |
| targets[groupingKeyIdx] = new Target(new FieldEval(groupbyNode.getGroupingColumns()[groupingKeyIdx])); |
| } |
| } |
| |
| if (aggEvalNames != null) { |
| for (int aggrFuncIdx = 0, targetIdx = groupingKeyNum; aggrFuncIdx < aggrFuncNum; aggrFuncIdx++, targetIdx++) { |
| targets[targetIdx] = |
| new Target(new FieldEval(aggEvalNames[aggrFuncIdx], aggEvalNodes[aggrFuncIdx].getValueType())); |
| } |
| } |
| |
| return targets; |
| } |
| |
| public LogicalNode visitFilter(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, |
| SelectionNode node, Stack<LogicalNode> stack) throws PlanningException { |
| Context newContext = new Context(context); |
| String referenceName = newContext.targetListMgr.add(node.getQual()); |
| newContext.addNecessaryReferences(node.getQual()); |
| |
| LogicalNode child = super.visitFilter(newContext, plan, block, node, stack); |
| |
| node.setInSchema(child.getOutSchema()); |
| node.setOutSchema(child.getOutSchema()); |
| |
| Target target = context.targetListMgr.getTarget(referenceName); |
| if (newContext.targetListMgr.isEvaluated(referenceName)) { |
| node.setQual(new FieldEval(target.getNamedColumn())); |
| } else { |
| node.setQual(target.getEvalTree()); |
| newContext.targetListMgr.markAsEvaluated(target); |
| } |
| |
| return node; |
| } |
| |
| private static void pushDownIfComplexTermInJoinCondition(Context ctx, EvalNode cnf, EvalNode term) |
| throws PlanningException { |
| |
| // If one of both terms in a binary operator is a complex expression, the binary operator will require |
| // multiple phases. In this case, join cannot evaluate a binary operator. |
| // So, we should prevent dividing the binary operator into more subexpressions. |
| if (term.getType() != EvalType.FIELD && !(term instanceof BinaryEval)) { |
| String refName = ctx.addExpr(term); |
| EvalTreeUtil.replace(cnf, term, new FieldEval(refName, term.getValueType())); |
| } |
| } |
| |
| public LogicalNode visitJoin(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node, |
| Stack<LogicalNode> stack) throws PlanningException { |
| Context newContext = new Context(context); |
| |
| String joinQualReference = null; |
| if (node.hasJoinQual()) { |
| for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(node.getJoinQual())) { |
| if (eachQual instanceof BinaryEval) { |
| BinaryEval binaryQual = (BinaryEval) eachQual; |
| |
| for (int i = 0; i < 2; i++) { |
| EvalNode term = binaryQual.getExpr(i); |
| pushDownIfComplexTermInJoinCondition(newContext, eachQual, term); |
| } |
| } |
| } |
| |
| joinQualReference = newContext.addExpr(node.getJoinQual()); |
| newContext.addNecessaryReferences(node.getJoinQual()); |
| } |
| |
| String [] referenceNames = null; |
| if (node.hasTargets()) { |
| referenceNames = new String[node.getTargets().length]; |
| int i = 0; |
| for (Iterator<Target> it = getFilteredTarget(node.getTargets(), context.requiredSet); it.hasNext();) { |
| Target target = it.next(); |
| referenceNames[i++] = newContext.addExpr(target); |
| } |
| } |
| |
| stack.push(node); |
| LogicalNode left = visit(newContext, plan, block, node.getLeftChild(), stack); |
| LogicalNode right = visit(newContext, plan, block, node.getRightChild(), stack); |
| stack.pop(); |
| |
| Schema merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema()); |
| |
| node.setInSchema(merged); |
| |
| if (node.hasJoinQual()) { |
| Target target = context.targetListMgr.getTarget(joinQualReference); |
| if (newContext.targetListMgr.isEvaluated(joinQualReference)) { |
| throw new PlanningException("Join condition must be evaluated in the proper Join Node: " + joinQualReference); |
| } else { |
| node.setJoinQual(target.getEvalTree()); |
| newContext.targetListMgr.markAsEvaluated(target); |
| } |
| } |
| |
| LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet(); |
| for (Iterator<String> it = getFilteredReferences(context.targetListMgr.getNames(), |
| context.requiredSet); it.hasNext();) { |
| String referenceName = it.next(); |
| Target target = context.targetListMgr.getTarget(referenceName); |
| |
| if (context.targetListMgr.isEvaluated(referenceName)) { |
| Target fieldReference = new Target(new FieldEval(target.getNamedColumn())); |
| if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, fieldReference.getEvalTree(), node, |
| stack.peek().getType() != NodeType.JOIN)) { |
| projectedTargets.add(fieldReference); |
| } |
| } else if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, target.getEvalTree(), node, |
| stack.peek().getType() != NodeType.JOIN)) { |
| projectedTargets.add(target); |
| context.targetListMgr.markAsEvaluated(target); |
| } |
| } |
| |
| node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()])); |
| LogicalPlanner.verifyProjectedFields(block, node); |
| return node; |
| } |
| |
| static Iterator<String> getFilteredReferences(Collection<String> targetNames, Set<String> required) { |
| return new FilteredStringsIterator(targetNames, required); |
| } |
| |
| static Iterator<String> getFilteredReferences(String [] targetNames, Collection<String> required) { |
| return new FilteredStringsIterator(targetNames, required); |
| } |
| |
| static class FilteredStringsIterator implements Iterator<String> { |
| Iterator<String> iterator; |
| |
| FilteredStringsIterator(Collection<String> targetNames, Collection<String> required) { |
| List<String> filtered = TUtil.newList(); |
| for (String name : targetNames) { |
| if (required.contains(name)) { |
| filtered.add(name); |
| } |
| } |
| |
| iterator = filtered.iterator(); |
| } |
| |
| FilteredStringsIterator(String [] targetNames, Collection<String> required) { |
| this(TUtil.newList(targetNames), required); |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return iterator.hasNext(); |
| } |
| |
| @Override |
| public String next() { |
| return iterator.next(); |
| } |
| |
| @Override |
| public void remove() { |
| } |
| } |
| |
| static Iterator<Target> getFilteredTarget(Target[] targets, Set<String> required) { |
| return new FilteredIterator(targets, required); |
| } |
| |
| static class FilteredIterator implements Iterator<Target> { |
| Iterator<Target> iterator; |
| |
| FilteredIterator(Target [] targets, Set<String> requiredReferences) { |
| List<Target> filtered = TUtil.newList(); |
| Map<String, Target> targetSet = new HashMap<String, Target>(); |
| for (Target t : targets) { |
| // Only should keep an raw target instead of field reference. |
| if (targetSet.containsKey(t.getCanonicalName())) { |
| Target targetInSet = targetSet.get(t.getCanonicalName()); |
| EvalNode evalNode = targetInSet.getEvalTree(); |
| if (evalNode.getType() == EvalType.FIELD && t.getEvalTree().getType() != EvalType.FIELD) { |
| targetSet.put(t.getCanonicalName(), t); |
| } |
| } else { |
| targetSet.put(t.getCanonicalName(), t); |
| } |
| } |
| |
| for (String name : requiredReferences) { |
| if (targetSet.containsKey(name)) { |
| filtered.add(targetSet.get(name)); |
| } |
| } |
| |
| iterator = filtered.iterator(); |
| } |
| @Override |
| public boolean hasNext() { |
| return iterator.hasNext(); |
| } |
| |
| @Override |
| public Target next() { |
| return iterator.next(); |
| } |
| |
| @Override |
| public void remove() { |
| } |
| } |
| |
| @Override |
| public LogicalNode visitUnion(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, UnionNode node, |
| Stack<LogicalNode> stack) throws PlanningException { |
| |
| LogicalPlan.QueryBlock leftBlock = plan.getBlock(node.getLeftChild()); |
| LogicalPlan.QueryBlock rightBlock = plan.getBlock(node.getRightChild()); |
| |
| Context leftContext = new Context(plan, PlannerUtil.toQualifiedFieldNames(context.requiredSet, |
| leftBlock.getName())); |
| Context rightContext = new Context(plan, PlannerUtil.toQualifiedFieldNames(context.requiredSet, |
| rightBlock.getName())); |
| |
| stack.push(node); |
| visit(leftContext, plan, leftBlock, leftBlock.getRoot(), new Stack<LogicalNode>()); |
| visit(rightContext, plan, rightBlock, rightBlock.getRoot(), new Stack<LogicalNode>()); |
| stack.pop(); |
| return node; |
| } |
| |
| public LogicalNode visitScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node, |
| Stack<LogicalNode> stack) throws PlanningException { |
| |
| Context newContext = new Context(context); |
| |
| Target [] targets; |
| if (node.hasTargets()) { |
| targets = node.getTargets(); |
| } else { |
| targets = PlannerUtil.schemaToTargets(node.getTableSchema()); |
| } |
| |
| LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet(); |
| for (Iterator<Target> it = getFilteredTarget(targets, newContext.requiredSet); it.hasNext();) { |
| Target target = it.next(); |
| newContext.addExpr(target); |
| } |
| |
| for (Iterator<Target> it = context.targetListMgr.getFilteredTargets(newContext.requiredSet); it.hasNext();) { |
| Target target = it.next(); |
| |
| if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, target.getEvalTree(), node)) { |
| projectedTargets.add(target); |
| newContext.targetListMgr.markAsEvaluated(target); |
| } |
| } |
| |
| node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()])); |
| LogicalPlanner.verifyProjectedFields(block, node); |
| return node; |
| } |
| |
| @Override |
| public LogicalNode visitPartitionedTableScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, |
| PartitionedTableScanNode node, Stack<LogicalNode> stack) |
| throws PlanningException { |
| |
| Context newContext = new Context(context); |
| |
| Target [] targets; |
| if (node.hasTargets()) { |
| targets = node.getTargets(); |
| } else { |
| targets = PlannerUtil.schemaToTargets(node.getOutSchema()); |
| } |
| |
| LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet(); |
| for (Iterator<Target> it = getFilteredTarget(targets, newContext.requiredSet); it.hasNext();) { |
| Target target = it.next(); |
| newContext.addExpr(target); |
| } |
| |
| for (Iterator<Target> it = context.targetListMgr.getFilteredTargets(newContext.requiredSet); it.hasNext();) { |
| Target target = it.next(); |
| |
| if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, target.getEvalTree(), node)) { |
| projectedTargets.add(target); |
| newContext.targetListMgr.markAsEvaluated(target); |
| } |
| } |
| |
| node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()])); |
| LogicalPlanner.verifyProjectedFields(block, node); |
| return node; |
| } |
| |
| @Override |
| public LogicalNode visitTableSubQuery(Context upperContext, LogicalPlan plan, LogicalPlan.QueryBlock block, |
| TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException { |
| Context childContext = new Context(plan, upperContext.requiredSet); |
| stack.push(node); |
| LogicalNode child = super.visitTableSubQuery(childContext, plan, block, node, stack); |
| node.setSubQuery(child); |
| stack.pop(); |
| |
| Target [] targets; |
| if (node.hasTargets()) { |
| targets = node.getTargets(); |
| } else { |
| targets = PlannerUtil.schemaToTargets(node.getOutSchema()); |
| } |
| |
| LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet(); |
| for (Iterator<Target> it = getFilteredTarget(targets, upperContext.requiredSet); it.hasNext();) { |
| Target target = it.next(); |
| upperContext.addExpr(target); |
| } |
| |
| for (Iterator<Target> it = upperContext.targetListMgr.getFilteredTargets(upperContext.requiredSet); it.hasNext();) { |
| Target target = it.next(); |
| |
| if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, target.getEvalTree(), node)) { |
| projectedTargets.add(target); |
| upperContext.targetListMgr.markAsEvaluated(target); |
| } |
| } |
| |
| node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()])); |
| LogicalPlanner.verifyProjectedFields(block, node); |
| return node; |
| } |
| |
| @Override |
| public LogicalNode visitInsert(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, InsertNode node, |
| Stack<LogicalNode> stack) throws PlanningException { |
| stack.push(node); |
| visit(context, plan, block, node.getChild(), stack); |
| stack.pop(); |
| return node; |
| } |
| } |