| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.engine.planner; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.ObjectArrays; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.tajo.SessionVars; |
| import org.apache.tajo.catalog.Column; |
| import org.apache.tajo.catalog.SortSpec; |
| import org.apache.tajo.catalog.proto.CatalogProtos; |
| import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto; |
| import org.apache.tajo.conf.TajoConf; |
| import org.apache.tajo.engine.planner.enforce.Enforcer; |
| import org.apache.tajo.engine.planner.global.DataChannel; |
| import org.apache.tajo.engine.planner.physical.*; |
| import org.apache.tajo.engine.query.QueryContext; |
| import org.apache.tajo.exception.TajoInternalError; |
| import org.apache.tajo.plan.LogicalPlan; |
| import org.apache.tajo.plan.logical.*; |
| import org.apache.tajo.plan.serder.LogicalNodeDeserializer; |
| import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer; |
| import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; |
| import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; |
| import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.SortSpecArray; |
| import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; |
| import org.apache.tajo.plan.serder.PlanProto.SortEnforce; |
| import org.apache.tajo.plan.serder.PlanProto.SortedInputEnforce; |
| import org.apache.tajo.plan.util.PlannerUtil; |
| import org.apache.tajo.storage.FileTablespace; |
| import org.apache.tajo.storage.StorageConstants; |
| import org.apache.tajo.storage.TablespaceManager; |
| import org.apache.tajo.storage.fragment.FileFragment; |
| import org.apache.tajo.storage.fragment.Fragment; |
| import org.apache.tajo.storage.fragment.FragmentConvertor; |
| import org.apache.tajo.unit.StorageUnit; |
| import org.apache.tajo.util.FileUtil; |
| import org.apache.tajo.util.StringUtils; |
| import org.apache.tajo.util.TUtil; |
| import org.apache.tajo.worker.TaskAttemptContext; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Stack; |
| |
| import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; |
| import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType; |
| import static org.apache.tajo.plan.serder.PlanProto.ColumnPartitionEnforcer.ColumnPartitionAlgorithm; |
| import static org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType; |
| import static org.apache.tajo.plan.serder.PlanProto.GroupbyEnforce.GroupbyAlgorithm; |
| import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; |
| |
| public class PhysicalPlannerImpl implements PhysicalPlanner { |
| private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class); |
| private static final int UNGENERATED_PID = -1; |
| |
| protected final TajoConf conf; |
| |
| public PhysicalPlannerImpl(final TajoConf conf) { |
| this.conf = conf; |
| } |
| |
| public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNode logicalPlan) { |
| |
| PhysicalExec execPlan; |
| |
| try { |
| execPlan = createPlanRecursive(context, logicalPlan, new Stack<>()); |
| if (execPlan instanceof StoreTableExec |
| || execPlan instanceof RangeShuffleFileWriteExec |
| || execPlan instanceof HashShuffleFileWriteExec |
| || execPlan instanceof ColPartitionStoreExec) { |
| return execPlan; |
| } else if (context.getDataChannel() != null) { |
| return buildOutputOperator(context, logicalPlan, execPlan); |
| } else { |
| return execPlan; |
| } |
| } catch (IOException ioe) { |
| throw new TajoInternalError(ioe); |
| } |
| } |
| |
| private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan, |
| PhysicalExec execPlan) throws IOException { |
| DataChannel channel = context.getDataChannel(); |
| ShuffleFileWriteNode shuffleFileWriteNode = LogicalPlan.createNodeWithoutPID(ShuffleFileWriteNode.class); |
| shuffleFileWriteNode.setDataFormat(context.getDataChannel().getDataFormat()); |
| shuffleFileWriteNode.setInSchema(plan.getOutSchema()); |
| shuffleFileWriteNode.setOutSchema(plan.getOutSchema()); |
| shuffleFileWriteNode.setShuffle(channel.getShuffleType(), channel.getShuffleKeys(), channel.getShuffleOutputNum()); |
| shuffleFileWriteNode.setChild(plan); |
| |
| PhysicalExec outExecPlan = createShuffleFileWritePlan(context, shuffleFileWriteNode, execPlan); |
| return outExecPlan; |
| } |
| |
| private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode, Stack<LogicalNode> stack) |
| throws IOException { |
| PhysicalExec leftExec; |
| PhysicalExec rightExec; |
| |
| switch (logicalNode.getType()) { |
| |
| case ROOT: |
| LogicalRootNode rootNode = (LogicalRootNode) logicalNode; |
| stack.push(rootNode); |
| leftExec = createPlanRecursive(ctx, rootNode.getChild(), stack); |
| stack.pop(); |
| return leftExec; |
| |
| case EXPRS: |
| EvalExprNode evalExpr = (EvalExprNode) logicalNode; |
| return new EvalExprExec(ctx, evalExpr); |
| |
| case CREATE_TABLE: |
| case INSERT: |
| case STORE: |
| StoreTableNode storeNode = (StoreTableNode) logicalNode; |
| stack.push(storeNode); |
| leftExec = createPlanRecursive(ctx, storeNode.getChild(), stack); |
| stack.pop(); |
| return createStorePlan(ctx, storeNode, leftExec); |
| |
| case SELECTION: |
| SelectionNode selNode = (SelectionNode) logicalNode; |
| stack.push(selNode); |
| leftExec = createPlanRecursive(ctx, selNode.getChild(), stack); |
| stack.pop(); |
| |
| return new SelectionExec(ctx, selNode, leftExec); |
| |
| case PROJECTION: |
| ProjectionNode prjNode = (ProjectionNode) logicalNode; |
| stack.push(prjNode); |
| leftExec = createPlanRecursive(ctx, prjNode.getChild(), stack); |
| stack.pop(); |
| |
| return new ProjectionExec(ctx, prjNode, leftExec); |
| |
| case TABLE_SUBQUERY: { |
| TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode; |
| stack.push(subQueryNode); |
| leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery(), stack); |
| stack.pop(); |
| return new ProjectionExec(ctx, subQueryNode, leftExec); |
| } |
| |
| case PARTITIONS_SCAN: |
| case SCAN: |
| leftExec = createScanPlan(ctx, (ScanNode) logicalNode, stack); |
| return leftExec; |
| |
| case GROUP_BY: |
| GroupbyNode grpNode = (GroupbyNode) logicalNode; |
| stack.push(grpNode); |
| leftExec = createPlanRecursive(ctx, grpNode.getChild(), stack); |
| stack.pop(); |
| return createGroupByPlan(ctx, grpNode, leftExec); |
| |
| case WINDOW_AGG: |
| WindowAggNode windowAggNode = (WindowAggNode) logicalNode; |
| stack.push(windowAggNode); |
| leftExec = createPlanRecursive(ctx, windowAggNode.getChild(), stack); |
| stack.pop(); |
| return createWindowAgg(ctx, windowAggNode, leftExec); |
| |
| case DISTINCT_GROUP_BY: |
| DistinctGroupbyNode distinctNode = (DistinctGroupbyNode) logicalNode; |
| stack.push(distinctNode); |
| leftExec = createPlanRecursive(ctx, distinctNode.getChild(), stack); |
| stack.pop(); |
| return createDistinctGroupByPlan(ctx, distinctNode, leftExec); |
| |
| case HAVING: |
| HavingNode havingNode = (HavingNode) logicalNode; |
| stack.push(havingNode); |
| leftExec = createPlanRecursive(ctx, havingNode.getChild(), stack); |
| stack.pop(); |
| return new HavingExec(ctx, havingNode, leftExec); |
| |
| case SORT: |
| SortNode sortNode = (SortNode) logicalNode; |
| stack.push(sortNode); |
| leftExec = createPlanRecursive(ctx, sortNode.getChild(), stack); |
| stack.pop(); |
| return createSortPlan(ctx, sortNode, leftExec); |
| |
| case JOIN: |
| JoinNode joinNode = (JoinNode) logicalNode; |
| stack.push(joinNode); |
| leftExec = createPlanRecursive(ctx, joinNode.getLeftChild(), stack); |
| rightExec = createPlanRecursive(ctx, joinNode.getRightChild(), stack); |
| stack.pop(); |
| |
| return createJoinPlan(ctx, joinNode, leftExec, rightExec); |
| |
| case UNION: |
| UnionNode unionNode = (UnionNode) logicalNode; |
| stack.push(unionNode); |
| leftExec = createPlanRecursive(ctx, unionNode.getLeftChild(), stack); |
| rightExec = createPlanRecursive(ctx, unionNode.getRightChild(), stack); |
| stack.pop(); |
| return new UnionExec(ctx, leftExec, rightExec); |
| |
| case LIMIT: |
| LimitNode limitNode = (LimitNode) logicalNode; |
| stack.push(limitNode); |
| leftExec = createPlanRecursive(ctx, limitNode.getChild(), stack); |
| stack.pop(); |
| return new LimitExec(ctx, limitNode.getInSchema(), |
| limitNode.getOutSchema(), leftExec, limitNode); |
| |
| case INDEX_SCAN: |
| IndexScanNode indexScanNode = (IndexScanNode) logicalNode; |
| leftExec = createIndexScanExec(ctx, indexScanNode); |
| return leftExec; |
| |
| case CREATE_INDEX: |
| CreateIndexNode createIndexNode = (CreateIndexNode) logicalNode; |
| stack.push(createIndexNode); |
| leftExec = createPlanRecursive(ctx, createIndexNode.getChild(), stack); |
| stack.pop(); |
| return new StoreIndexExec(ctx, createIndexNode, leftExec); |
| |
| default: |
| return null; |
| } |
| } |
| |
| @VisibleForTesting |
| public static long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) { |
| long size = 0; |
| for (String tableId : tableIds) { |
| FragmentProto[] fragmentProtos = ctx.getTables(tableId); |
| List<Fragment> fragments = FragmentConvertor.convert(ctx.getConf(), fragmentProtos); |
| for (Fragment frag : fragments) { |
| size += TablespaceManager.guessFragmentVolume(ctx.getConf(), frag); |
| } |
| } |
| return size; |
| } |
| |
| private boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left) |
| throws IOException { |
| String [] lineage = PlannerUtil.getRelationLineage(node); |
| long volume = estimateSizeRecursive(context, lineage); |
| return checkIfInMemoryInnerJoinIsPossible(context, lineage, volume, left); |
| } |
| |
| @VisibleForTesting |
| public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, String [] lineage, long tableVolume, |
| boolean left) throws IOException { |
| boolean inMemoryInnerJoinFlag; |
| |
| QueryContext queryContext = context.getQueryContext(); |
| |
| if (queryContext.containsKey(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)) { |
| inMemoryInnerJoinFlag = tableVolume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT) |
| * StorageUnit.MB; |
| } else { |
| inMemoryInnerJoinFlag = tableVolume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT) |
| * StorageUnit.MB; |
| } |
| |
| LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.", |
| context.getTaskId().toString(), |
| (left ? "Left" : "Right"), |
| StringUtils.join(lineage), |
| FileUtil.humanReadableByteCount(tableVolume, false), |
| (inMemoryInnerJoinFlag ? "" : "not "))); |
| return inMemoryInnerJoinFlag; |
| } |
| |
| public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec, |
| PhysicalExec rightExec) throws IOException { |
| |
| switch (joinNode.getJoinType()) { |
| case CROSS: |
| return createCrossJoinPlan(context, joinNode, leftExec, rightExec); |
| |
| case INNER: |
| return createInnerJoinPlan(context, joinNode, leftExec, rightExec); |
| |
| case LEFT_OUTER: |
| return createLeftOuterJoinPlan(context, joinNode, leftExec, rightExec); |
| |
| case RIGHT_OUTER: |
| return createRightOuterJoinPlan(context, joinNode, leftExec, rightExec); |
| |
| case FULL_OUTER: |
| return createFullOuterJoinPlan(context, joinNode, leftExec, rightExec); |
| |
| case LEFT_SEMI: |
| return createLeftSemiJoinPlan(context, joinNode, leftExec, rightExec); |
| |
| case RIGHT_SEMI: |
| return createRightSemiJoinPlan(context, joinNode, leftExec, rightExec); |
| |
| case LEFT_ANTI: |
| return createLeftAntiJoinPlan(context, joinNode, leftExec, rightExec); |
| |
| case RIGHT_ANTI: |
| return createRightAntiJoinPlan(context, joinNode, leftExec, rightExec); |
| |
| default: |
| throw new PhysicalPlanningException("Cannot support join type: " + joinNode.getJoinType().name()); |
| } |
| } |
| |
| private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| Enforcer enforcer = context.getEnforcer(); |
| EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan); |
| |
| if (property != null) { |
| JoinAlgorithm algorithm = property.getJoin().getAlgorithm(); |
| |
| switch (algorithm) { |
| default: |
| // fallback algorithm |
| LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name()); |
| PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec); |
| return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]); |
| } |
| |
| } else { |
| LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]"); |
| // returns two PhysicalExec. smaller one is 0, and larger one is 1. |
| PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec); |
| return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]); |
| } |
| } |
| |
| private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| Enforcer enforcer = context.getEnforcer(); |
| EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan); |
| |
| if (property != null) { |
| JoinAlgorithm algorithm = property.getJoin().getAlgorithm(); |
| |
| switch (algorithm) { |
| case IN_MEMORY_HASH_JOIN: |
| LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]"); |
| // returns two PhysicalExec. smaller one is 0, and larger one is 1. |
| PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec); |
| return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]); |
| case MERGE_JOIN: |
| LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]"); |
| return createMergeInnerJoin(context, plan, leftExec, rightExec); |
| case HYBRID_HASH_JOIN: |
| |
| default: |
| LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name()); |
| LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name()); |
| return createMergeInnerJoin(context, plan, leftExec, rightExec); |
| } |
| } else { |
| return createBestInnerJoinPlan(context, plan, leftExec, rightExec); |
| } |
| } |
| |
| /** |
| * It returns two {@link org.apache.tajo.engine.planner.physical.PhysicalExec}s sorted in an ascending order of |
| * their child relations' total volume. In other words, the smaller side is returned as 0's PhysicalExec, and |
| * the larger side is returned as 1's PhysicalExec. |
| */ |
| @VisibleForTesting |
| public PhysicalExec [] switchJoinSidesIfNecessary(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec left, PhysicalExec right) throws IOException { |
| String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild()); |
| String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild()); |
| long leftSize = estimateSizeRecursive(context, leftLineage); |
| long rightSize = estimateSizeRecursive(context, rightLineage); |
| |
| PhysicalExec smaller; |
| PhysicalExec larger; |
| if (leftSize <= rightSize) { |
| smaller = left; |
| larger = right; |
| LOG.info(String.format("[%s] Left relations %s (%s) is smaller than Right relations %s (%s).", |
| context.getTaskId().toString(), |
| StringUtils.join(leftLineage), |
| FileUtil.humanReadableByteCount(leftSize, false), |
| StringUtils.join(rightLineage), |
| FileUtil.humanReadableByteCount(rightSize, false))); |
| } else { |
| smaller = right; |
| larger = left; |
| LOG.info(String.format("[%s] Right relations %s (%s) is smaller than Left relations %s (%s).", |
| context.getTaskId().toString(), |
| StringUtils.join(rightLineage), |
| FileUtil.humanReadableByteCount(rightSize, false), |
| StringUtils.join(leftLineage), |
| FileUtil.humanReadableByteCount(leftSize, false))); |
| } |
| |
| return new PhysicalExec [] {smaller, larger}; |
| } |
| |
| private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| boolean inMemoryHashJoin = false; |
| if (checkIfInMemoryInnerJoinIsPossible(context, plan.getLeftChild(), true) |
| || checkIfInMemoryInnerJoinIsPossible(context, plan.getRightChild(), false)) { |
| inMemoryHashJoin = true; |
| } |
| |
| if (inMemoryHashJoin) { |
| LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]"); |
| // returns two PhysicalExec. smaller one is 0, and larger one is 1. |
| PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec); |
| return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]); |
| } else { |
| return createMergeInnerJoin(context, plan, leftExec, rightExec); |
| } |
| } |
| |
| private MergeJoinExec createMergeInnerJoin(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual( |
| plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema()); |
| |
| SortNode leftSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); |
| leftSortNode.setSortSpecs(sortSpecs[0]); |
| leftSortNode.setInSchema(leftExec.getSchema()); |
| leftSortNode.setOutSchema(leftExec.getSchema()); |
| ExternalSortExec outerSort = new ExternalSortExec(context, leftSortNode, leftExec); |
| |
| SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); |
| rightSortNode.setSortSpecs(sortSpecs[1]); |
| rightSortNode.setInSchema(rightExec.getSchema()); |
| rightSortNode.setOutSchema(rightExec.getSchema()); |
| ExternalSortExec innerSort = new ExternalSortExec(context, rightSortNode, rightExec); |
| |
| LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]"); |
| return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]); |
| } |
| |
| private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| Enforcer enforcer = context.getEnforcer(); |
| EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan); |
| if (property != null) { |
| JoinAlgorithm algorithm = property.getJoin().getAlgorithm(); |
| switch (algorithm) { |
| case IN_MEMORY_HASH_JOIN: |
| LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join]."); |
| return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec); |
| case MERGE_JOIN: |
| //the right operand is too large, so we opt for merge join implementation |
| LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Merge Join]."); |
| return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec); |
| default: |
| LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name()); |
| LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN); |
| return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec); |
| } |
| } else { |
| return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec); |
| } |
| } |
| |
| private static boolean isHashOuterJoinFeasible(TaskAttemptContext context, LogicalNode innerRelation) { |
| String [] rightLineage = PlannerUtil.getRelationLineage(innerRelation); |
| long estimatedVolume = estimateSizeRecursive(context, rightLineage); |
| |
| QueryContext queryContext = context.getQueryContext(); |
| |
| if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) { |
| return estimatedVolume <= queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB; |
| } else { |
| return estimatedVolume <= queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB; |
| } |
| } |
| |
| private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| |
| if (isHashOuterJoinFeasible(context, plan.getRightChild())) { |
| // we can implement left outer join using hash join, using the right operand as the build relation |
| LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join]."); |
| return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec); |
| } |
| else { |
| //the right operand is too large, so we opt for merge join implementation |
| LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Merge Join]."); |
| return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec); |
| } |
| } |
| |
| private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| //if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note: |
| // blocking, but merge join is blocking as well) |
| if (isHashOuterJoinFeasible(context, plan.getLeftChild())){ |
| LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join]."); |
| return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec); |
| } else { |
| return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec); |
| } |
| } |
| |
| private PhysicalExec createRightOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| //the left operand is too large, so opt for merge join implementation |
| LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Merge Join]."); |
| SortSpec[][] sortSpecs2 = PlannerUtil.getSortKeysFromJoinQual( |
| plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema()); |
| |
| SortNode leftSortNode2 = LogicalPlan.createNodeWithoutPID(SortNode.class); |
| leftSortNode2.setSortSpecs(sortSpecs2[0]); |
| leftSortNode2.setInSchema(leftExec.getSchema()); |
| leftSortNode2.setOutSchema(leftExec.getSchema()); |
| ExternalSortExec outerSort2 = new ExternalSortExec(context, leftSortNode2, leftExec); |
| |
| SortNode rightSortNode2 = LogicalPlan.createNodeWithoutPID(SortNode.class); |
| rightSortNode2.setSortSpecs(sortSpecs2[1]); |
| rightSortNode2.setInSchema(rightExec.getSchema()); |
| rightSortNode2.setOutSchema(rightExec.getSchema()); |
| ExternalSortExec innerSort2 = new ExternalSortExec(context, rightSortNode2, rightExec); |
| |
| return new RightOuterMergeJoinExec(context, plan, outerSort2, innerSort2, sortSpecs2[0], sortSpecs2[1]); |
| } |
| |
| private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| Enforcer enforcer = context.getEnforcer(); |
| EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan); |
| if (property != null) { |
| JoinAlgorithm algorithm = property.getJoin().getAlgorithm(); |
| switch (algorithm) { |
| case IN_MEMORY_HASH_JOIN: |
| LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join]."); |
| return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec); |
| case MERGE_JOIN: |
| return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec); |
| default: |
| LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name()); |
| LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN); |
| return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec); |
| } |
| } else { |
| return createBestRightJoinPlan(context, plan, leftExec, rightExec); |
| } |
| } |
| |
| private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| Enforcer enforcer = context.getEnforcer(); |
| EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan); |
| if (property != null) { |
| JoinAlgorithm algorithm = property.getJoin().getAlgorithm(); |
| switch (algorithm) { |
| case IN_MEMORY_HASH_JOIN: |
| return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec); |
| |
| case MERGE_JOIN: |
| return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec); |
| |
| default: |
| LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name()); |
| LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN); |
| return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec); |
| } |
| } else { |
| return createBestFullOuterJoinPlan(context, plan, leftExec, rightExec); |
| } |
| } |
| |
| private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) |
| throws IOException { |
| String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild()); |
| String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild()); |
| long outerSize2 = estimateSizeRecursive(context, leftLineage); |
| long innerSize2 = estimateSizeRecursive(context, rightLineage); |
| |
| PhysicalExec selectedRight; |
| PhysicalExec selectedLeft; |
| |
| // HashJoinExec loads the smaller relation to memory. |
| if (outerSize2 <= innerSize2) { |
| selectedLeft = leftExec; |
| selectedRight = rightExec; |
| } else { |
| selectedLeft = rightExec; |
| selectedRight = leftExec; |
| } |
| LOG.info("Full Outer Join (" + plan.getPID() + ") chooses [Hash Join]"); |
| return new HashFullOuterJoinExec(context, plan, selectedRight, selectedLeft); |
| } |
| |
| private MergeFullOuterJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) |
| throws IOException { |
| // if size too large, full outer merge join implementation |
| LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Merge Join]"); |
| SortSpec[][] sortSpecs3 = PlannerUtil.getSortKeysFromJoinQual(plan.getJoinQual(), |
| leftExec.getSchema(), rightExec.getSchema()); |
| |
| SortNode leftSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); |
| leftSortNode.setSortSpecs(sortSpecs3[0]); |
| leftSortNode.setInSchema(leftExec.getSchema()); |
| leftSortNode.setOutSchema(leftExec.getSchema()); |
| ExternalSortExec outerSort3 = new ExternalSortExec(context, leftSortNode, leftExec); |
| |
| SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); |
| rightSortNode.setSortSpecs(sortSpecs3[1]); |
| rightSortNode.setInSchema(rightExec.getSchema()); |
| rightSortNode.setOutSchema(rightExec.getSchema()); |
| ExternalSortExec innerSort3 = new ExternalSortExec(context, rightSortNode, rightExec); |
| |
| return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]); |
| } |
| |
| private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| // The inner relation is always expected to be smaller than the outer relation. |
| // (See GreedyHeuristicJoinOrderAlgorithm:::swapLeftAndRightIfNecessary(). |
| // Thus, we need to evaluate only that the right table is able to be loaded or not. |
| if (isHashOuterJoinFeasible(context, plan.getRightChild())) { |
| return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec); |
| } else { |
| return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec); |
| } |
| } |
| |
| /** |
| * Left semi join means that the left side is the IN side table, and the right side is the FROM side table. |
| */ |
| private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| Enforcer enforcer = context.getEnforcer(); |
| EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan); |
| if (property != null) { |
| JoinAlgorithm algorithm = property.getJoin().getAlgorithm(); |
| switch (algorithm) { |
| case IN_MEMORY_HASH_JOIN: |
| LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join]."); |
| return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec); |
| |
| default: |
| // TODO: implement sort-based semi join operator |
| LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name()); |
| LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name()); |
| return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec); |
| } |
| } else { |
| LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join]."); |
| return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec); |
| } |
| } |
| |
| /** |
| * Left semi join means that the left side is the FROM side table, and the right side is the IN side table. |
| */ |
| private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| Enforcer enforcer = context.getEnforcer(); |
| EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan); |
| if (property != null) { |
| JoinAlgorithm algorithm = property.getJoin().getAlgorithm(); |
| switch (algorithm) { |
| case IN_MEMORY_HASH_JOIN: |
| LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join]."); |
| return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec); |
| |
| default: |
| // TODO: implement sort-based semi join operator |
| LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name()); |
| LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name()); |
| return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec); |
| } |
| } else { |
| LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join]."); |
| return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec); |
| } |
| } |
| |
| /** |
| * Left semi join means that the left side is the FROM side table, and the right side is the IN side table. |
| */ |
| private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| Enforcer enforcer = context.getEnforcer(); |
| EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan); |
| if (property != null) { |
| JoinAlgorithm algorithm = property.getJoin().getAlgorithm(); |
| switch (algorithm) { |
| case IN_MEMORY_HASH_JOIN: |
| LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join]."); |
| return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec); |
| |
| default: |
| // TODO: implement sort-based anti join operator |
| LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name()); |
| LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name()); |
| return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec); |
| } |
| } else { |
| LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join]."); |
| return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec); |
| } |
| } |
| |
| /** |
| * Left semi join means that the left side is the FROM side table, and the right side is the IN side table. |
| */ |
| private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, JoinNode plan, |
| PhysicalExec leftExec, PhysicalExec rightExec) throws IOException { |
| Enforcer enforcer = context.getEnforcer(); |
| EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan); |
| if (property != null) { |
| JoinAlgorithm algorithm = property.getJoin().getAlgorithm(); |
| switch (algorithm) { |
| case IN_MEMORY_HASH_JOIN: |
| LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join]."); |
| return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec); |
| |
| default: |
| // TODO: implement sort-based anti join operator |
| LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name()); |
| LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name()); |
| return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec); |
| } |
| } else { |
| LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join]."); |
| return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec); |
| } |
| } |
| |
| |
| /** |
| * Create a shuffle file write executor to store intermediate data into local disks. |
| */ |
| public PhysicalExec createShuffleFileWritePlan(TaskAttemptContext ctx, |
| ShuffleFileWriteNode plan, PhysicalExec subOp) throws IOException { |
| plan.getOptions().set(StorageConstants.SHUFFLE_TYPE, |
| PlannerUtil.getShuffleType(ctx.getDataChannel().getShuffleType())); |
| |
| switch (plan.getShuffleType()) { |
| case HASH_SHUFFLE: |
| case SCATTERED_HASH_SHUFFLE: |
| return new HashShuffleFileWriteExec(ctx, plan, subOp); |
| |
| case RANGE_SHUFFLE: |
| SortExec sortExec = PhysicalPlanUtil.findExecutor(subOp, SortExec.class); |
| |
| SortSpec [] sortSpecs = null; |
| if (sortExec != null) { |
| sortSpecs = sortExec.getSortSpecs(); |
| } else { |
| Column[] columns = ctx.getDataChannel().getShuffleKeys(); |
| SortSpec specs[] = new SortSpec[columns.length]; |
| for (int i = 0; i < columns.length; i++) { |
| specs[i] = new SortSpec(columns[i]); |
| } |
| } |
| return new RangeShuffleFileWriteExec(ctx, plan, subOp, sortSpecs); |
| |
| case NONE_SHUFFLE: |
| // if there is no given NULL CHAR property in the table property and the query is neither CTAS or INSERT, |
| // we set DEFAULT NULL CHAR to the table property. |
| if (!ctx.getQueryContext().containsKey(SessionVars.NULL_CHAR)) { |
| plan.getOptions().set(StorageConstants.TEXT_NULL, TajoConf.ConfVars.$TEXT_NULL.defaultVal); |
| } |
| return new StoreTableExec(ctx, plan, subOp); |
| |
| default: |
| throw new IllegalStateException(ctx.getDataChannel().getShuffleType() + " is not supported yet."); |
| } |
| } |
| |
| /** |
| * Create a executor to store a table into HDFS. This is used for CREATE TABLE .. |
| * AS or INSERT (OVERWRITE) INTO statement. |
| */ |
| public PhysicalExec createStorePlan(TaskAttemptContext ctx, |
| StoreTableNode plan, PhysicalExec subOp) throws IOException { |
| |
| if (plan.getPartitionMethod() != null) { |
| switch (plan.getPartitionMethod().getPartitionType()) { |
| case COLUMN: |
| return createColumnPartitionStorePlan(ctx, plan, subOp); |
| default: |
| throw new IllegalStateException(plan.getPartitionMethod().getPartitionType() + " is not supported yet."); |
| } |
| } else { |
| return new StoreTableExec(ctx, plan, subOp); |
| } |
| } |
| |
| private PhysicalExec createColumnPartitionStorePlan(TaskAttemptContext context, |
| StoreTableNode storeTableNode, |
| PhysicalExec child) throws IOException { |
| Enforcer enforcer = context.getEnforcer(); |
| EnforceProperty property = getAlgorithmEnforceProperty(enforcer, storeTableNode); |
| if (property != null) { |
| ColumnPartitionAlgorithm algorithm = property.getColumnPartition().getAlgorithm(); |
| switch (algorithm) { |
| case HASH_PARTITION: |
| return createHashColumnPartitionStorePlan(context, storeTableNode, child); |
| case SORT_PARTITION: // default algorithm |
| default: |
| return createSortBasedColumnPartitionStorePlan(context, storeTableNode, child); |
| } |
| } else { // default algorithm is sorted-based column partition |
| return createSortBasedColumnPartitionStorePlan(context, storeTableNode, child); |
| } |
| } |
| |
| private PhysicalExec createHashColumnPartitionStorePlan(TaskAttemptContext context, |
| StoreTableNode storeTableNode, |
| PhysicalExec child) throws IOException { |
| LOG.info("The planner chooses [Hash-based Column Partitioned Store] algorithm"); |
| return new HashBasedColPartitionStoreExec(context, storeTableNode, child); |
| } |
| |
| private PhysicalExec createSortBasedColumnPartitionStorePlan(TaskAttemptContext context, |
| StoreTableNode storeTableNode, |
| PhysicalExec child) throws IOException { |
| |
| Column[] partitionKeyColumns = storeTableNode.getPartitionMethod().getExpressionSchema().toArray(); |
| SortSpec[] sortSpecs = new SortSpec[partitionKeyColumns.length]; |
| |
| if (storeTableNode.getType() == NodeType.INSERT) { |
| InsertNode insertNode = (InsertNode) storeTableNode; |
| for (int i = 0; i < partitionKeyColumns.length; i++) { |
| for (Column column : partitionKeyColumns) { |
| int id = insertNode.getTableSchema().getColumnId(column.getQualifiedName()); |
| sortSpecs[i++] = new SortSpec(insertNode.getProjectedSchema().getColumn(id), true, false); |
| } |
| } |
| } else if (storeTableNode.getType() == NodeType.CREATE_TABLE) { |
| int i = 0; |
| for (int j = 0; j < partitionKeyColumns.length; j++) { |
| int id = storeTableNode.getOutSchema().getRootColumns().size() + j; |
| Column column = storeTableNode.getInSchema().getColumn(id); |
| sortSpecs[i++] = new SortSpec(column, true, false); |
| } |
| |
| } else { |
| for (int i = 0; i < partitionKeyColumns.length; i++) { |
| sortSpecs[i] = new SortSpec(partitionKeyColumns[i], true, false); |
| } |
| } |
| |
| SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); |
| sortNode.setSortSpecs(sortSpecs); |
| sortNode.setInSchema(child.getSchema()); |
| sortNode.setOutSchema(child.getSchema()); |
| |
| ExternalSortExec sortExec = new ExternalSortExec(context, sortNode, child); |
| LOG.info("The planner chooses [Sort-based Column Partitioned Store] algorithm"); |
| return new SortBasedColPartitionStoreExec(context, storeTableNode, sortExec); |
| } |
| |
| private boolean checkIfSortEquivalance(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node) { |
| Enforcer enforcer = ctx.getEnforcer(); |
| List<EnforceProperty> property = enforcer.getEnforceProperties(EnforceType.SORTED_INPUT); |
| if (property != null && property.size() > 0 && node.peek().getType() == NodeType.SORT) { |
| SortNode sortNode = (SortNode) node.peek(); |
| SortedInputEnforce sortEnforcer = property.get(0).getSortedInput(); |
| |
| boolean condition = scanNode.getTableName().equals(sortEnforcer.getTableName()); |
| SortSpec [] sortSpecs = LogicalNodeDeserializer.convertSortSpecs(sortEnforcer.getSortSpecsList()); |
| return condition && TUtil.checkEquals(sortNode.getSortKeys(), sortSpecs); |
| } else { |
| return false; |
| } |
| } |
| |
| public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node) |
| throws IOException { |
| // check if an input is sorted in the same order to the subsequence sort operator. |
| if (checkIfSortEquivalance(ctx, scanNode, node)) { |
| if (ctx.getTable(scanNode.getCanonicalName()) == null) { |
| return new SeqScanExec(ctx, scanNode, null); |
| } |
| FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); |
| return new ExternalSortExec(ctx, (SortNode) node.peek(), scanNode, fragments); |
| } else { |
| Enforcer enforcer = ctx.getEnforcer(); |
| |
| // check if this table is broadcasted one or not. |
| boolean broadcastFlag = false; |
| if (enforcer != null && enforcer.hasEnforceProperty(EnforceType.BROADCAST)) { |
| List<EnforceProperty> properties = enforcer.getEnforceProperties(EnforceType.BROADCAST); |
| for (EnforceProperty property : properties) { |
| broadcastFlag |= scanNode.getCanonicalName().equals(property.getBroadcast().getTableName()); |
| } |
| } |
| |
| if (scanNode instanceof PartitionedTableScanNode |
| && ((PartitionedTableScanNode)scanNode).getInputPaths() != null && |
| ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) { |
| |
| if (broadcastFlag) { |
| PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode; |
| List<Fragment> fileFragments = new ArrayList<>(); |
| |
| FileTablespace space = TablespaceManager.get(scanNode.getTableDesc().getUri()); |
| for (Path path : partitionedTableScanNode.getInputPaths()) { |
| fileFragments.addAll(Arrays.asList(space.split(scanNode.getCanonicalName(), path))); |
| } |
| |
| FragmentProto[] fragments = |
| FragmentConvertor.toFragmentProtoArray(conf, fileFragments.toArray(new Fragment[fileFragments.size()])); |
| |
| ctx.addFragments(scanNode.getCanonicalName(), fragments); |
| return new PartitionMergeScanExec(ctx, scanNode, fragments); |
| } |
| } |
| |
| if (ctx.getTable(scanNode.getCanonicalName()) == null) { |
| return new SeqScanExec(ctx, scanNode, null); |
| } |
| FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); |
| return new SeqScanExec(ctx, scanNode, fragments); |
| } |
| } |
| |
| public PhysicalExec createGroupByPlan(TaskAttemptContext context,GroupbyNode groupbyNode, PhysicalExec subOp) |
| throws IOException { |
| |
| Enforcer enforcer = context.getEnforcer(); |
| EnforceProperty property = getAlgorithmEnforceProperty(enforcer, groupbyNode); |
| if (property != null) { |
| GroupbyAlgorithm algorithm = property.getGroupby().getAlgorithm(); |
| if (algorithm == GroupbyAlgorithm.HASH_AGGREGATION) { |
| return createInMemoryHashAggregation(context, groupbyNode, subOp); |
| } else { |
| return createSortAggregation(context, property, groupbyNode, subOp); |
| } |
| } |
| return createBestAggregationPlan(context, groupbyNode, subOp); |
| } |
| |
| private PhysicalExec createInMemoryHashAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp) |
| throws IOException { |
| LOG.info("The planner chooses [Hash Aggregation]"); |
| return new HashAggregateExec(ctx, groupbyNode, subOp); |
| } |
| |
| private PhysicalExec createSortAggregation(TaskAttemptContext ctx, EnforceProperty property, GroupbyNode groupbyNode, |
| PhysicalExec subOp) throws IOException { |
| |
| Column[] grpColumns = groupbyNode.getGroupingColumns(); |
| SortSpec[] sortSpecs = new SortSpec[grpColumns.length]; |
| for (int i = 0; i < grpColumns.length; i++) { |
| sortSpecs[i] = new SortSpec(grpColumns[i], true, false); |
| } |
| |
| if (property != null) { |
| List<CatalogProtos.SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList(); |
| |
| List<SortSpec> enforcedSortSpecList = Lists.newArrayList(); |
| int i = 0; |
| outer: |
| for (SortSpecProto sortSpecProto : sortSpecProtos) { |
| SortSpec enforcedSortSpecs = new SortSpec(sortSpecProto); |
| |
| for (Column grpKey : grpColumns) { // if this sort key is included in grouping columns, skip it. |
| if (enforcedSortSpecs.getSortKey().equals(grpKey)) { |
| continue outer; |
| } |
| } |
| |
| enforcedSortSpecList.add(enforcedSortSpecs); |
| } |
| |
| sortSpecs = ObjectArrays.concat(sortSpecs, TUtil.toArray(enforcedSortSpecList, SortSpec.class), SortSpec.class); |
| } |
| |
| SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); |
| sortNode.setSortSpecs(sortSpecs); |
| sortNode.setInSchema(subOp.getSchema()); |
| sortNode.setOutSchema(subOp.getSchema()); |
| ExternalSortExec sortExec = new ExternalSortExec(ctx, sortNode, subOp); |
| LOG.info("The planner chooses [Sort Aggregation] in (" + StringUtils.join(sortSpecs) + ")"); |
| return new SortAggregateExec(ctx, groupbyNode, sortExec); |
| } |
| |
| private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, GroupbyNode groupbyNode, |
| PhysicalExec subOp) throws IOException { |
| Column[] grpColumns = groupbyNode.getGroupingColumns(); |
| if (grpColumns.length == 0) { |
| return createInMemoryHashAggregation(context, groupbyNode, subOp); |
| } |
| |
| String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild()); |
| long estimatedSize = estimateSizeRecursive(context, outerLineage); |
| final long threshold = context.getQueryContext().getLong(SessionVars.HASH_GROUPBY_SIZE_LIMIT) * StorageUnit.MB; |
| |
| // if the relation size is less than the threshold, |
| // the hash aggregation will be used. |
| LOG.info("Aggregation:estimatedSize=" + estimatedSize + ", threshold=" + threshold); |
| if (estimatedSize <= threshold) { |
| LOG.info("The planner chooses [Hash Aggregation]"); |
| return createInMemoryHashAggregation(context, groupbyNode, subOp); |
| } else { |
| return createSortAggregation(context, null, groupbyNode, subOp); |
| } |
| } |
| |
| public PhysicalExec createWindowAgg(TaskAttemptContext context,WindowAggNode windowAggNode, PhysicalExec subOp) |
| throws IOException { |
| PhysicalExec child = subOp; |
| if (windowAggNode.hasPartitionKeys()) { |
| Column[] grpColumns = windowAggNode.getPartitionKeys(); |
| SortSpec[] sortSpecs = new SortSpec[grpColumns.length]; |
| for (int i = 0; i < grpColumns.length; i++) { |
| sortSpecs[i] = new SortSpec(grpColumns[i], true, false); |
| } |
| |
| SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); |
| sortNode.setSortSpecs(sortSpecs); |
| sortNode.setInSchema(subOp.getSchema()); |
| sortNode.setOutSchema(subOp.getSchema()); |
| child = new ExternalSortExec(context, sortNode, subOp); |
| LOG.info("The planner chooses [Sort Aggregation] in (" + StringUtils.join(sortSpecs) + ")"); |
| } |
| |
| return new WindowAggExec(context, windowAggNode, child); |
| } |
| |
| public PhysicalExec createDistinctGroupByPlan(TaskAttemptContext context, |
| DistinctGroupbyNode distinctNode, PhysicalExec subOp) |
| throws IOException { |
| Enforcer enforcer = context.getEnforcer(); |
| EnforceProperty property = getAlgorithmEnforceProperty(enforcer, distinctNode); |
| if (property != null) { |
| if (property.getDistinct().getIsMultipleAggregation()) { |
| MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage(); |
| |
| if (stage == MultipleAggregationStage.FIRST_STAGE) { |
| return new DistinctGroupbyFirstAggregationExec(context, distinctNode, subOp); |
| } else if (stage == MultipleAggregationStage.SECOND_STAGE) { |
| return new DistinctGroupbySecondAggregationExec(context, distinctNode, |
| createSortExecForDistinctGroupby(context, distinctNode, subOp, 2)); |
| } else { |
| return new DistinctGroupbyThirdAggregationExec(context, distinctNode, |
| createSortExecForDistinctGroupby(context, distinctNode, subOp, 3)); |
| } |
| } else { |
| DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm(); |
| if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) { |
| return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp); |
| } else { |
| return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct()); |
| } |
| } |
| } else { |
| return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp); |
| } |
| } |
| |
| private SortExec createSortExecForDistinctGroupby(TaskAttemptContext context, |
| DistinctGroupbyNode distinctNode, |
| PhysicalExec subOp, |
| int phase) throws IOException { |
| SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); |
| //2 phase: seq, groupby columns, distinct1 keys, distinct2 keys, |
| //3 phase: groupby columns, seq, distinct1 keys, distinct2 keys, |
| List<SortSpec> sortSpecs = new ArrayList<>(); |
| if (phase == 2) { |
| sortSpecs.add(new SortSpec(distinctNode.getTargets().get(0).getNamedColumn())); |
| } |
| for (Column eachColumn: distinctNode.getGroupingColumns()) { |
| sortSpecs.add(new SortSpec(eachColumn)); |
| } |
| if (phase == 3) { |
| sortSpecs.add(new SortSpec(distinctNode.getTargets().get(0).getNamedColumn())); |
| } |
| for (GroupbyNode eachGroupbyNode: distinctNode.getSubPlans()) { |
| for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) { |
| sortSpecs.add(new SortSpec(eachColumn)); |
| } |
| } |
| sortNode.setSortSpecs(sortSpecs.toArray(new SortSpec[sortSpecs.size()])); |
| sortNode.setInSchema(distinctNode.getInSchema()); |
| sortNode.setOutSchema(distinctNode.getInSchema()); |
| ExternalSortExec sortExec = new ExternalSortExec(context, sortNode, subOp); |
| |
| return sortExec; |
| } |
| |
| private PhysicalExec createInMemoryDistinctGroupbyExec(TaskAttemptContext ctx, |
| DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp) throws IOException { |
| return new DistinctGroupbyHashAggregationExec(ctx, distinctGroupbyNode, subOp); |
| } |
| |
| private PhysicalExec createSortAggregationDistinctGroupbyExec(TaskAttemptContext ctx, |
| DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp, |
| DistinctGroupbyEnforcer enforcer) throws IOException { |
| List<GroupbyNode> groupbyNodes = distinctGroupbyNode.getSubPlans(); |
| |
| SortAggregateExec[] sortAggregateExec = new SortAggregateExec[groupbyNodes.size()]; |
| |
| List<SortSpecArray> sortSpecArrays = enforcer.getSortSpecArraysList(); |
| |
| int index = 0; |
| for (GroupbyNode eachGroupbyNode: groupbyNodes) { |
| SortSpecArray sortSpecArray = sortSpecArrays.get(index); |
| SortSpec[] sortSpecs = new SortSpec[sortSpecArray.getSortSpecsList().size()]; |
| int sortIndex = 0; |
| for (SortSpecProto eachProto: sortSpecArray.getSortSpecsList()) { |
| sortSpecs[sortIndex++] = new SortSpec(eachProto); |
| } |
| SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); |
| sortNode.setSortSpecs(sortSpecs); |
| sortNode.setInSchema(subOp.getSchema()); |
| sortNode.setOutSchema(eachGroupbyNode.getInSchema()); |
| ExternalSortExec sortExec = new ExternalSortExec(ctx, sortNode, subOp); |
| |
| sortAggregateExec[index++] = new SortAggregateExec(ctx, eachGroupbyNode, sortExec); |
| } |
| |
| return new DistinctGroupbySortAggregationExec(ctx, distinctGroupbyNode, sortAggregateExec); |
| } |
| |
| public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode, |
| PhysicalExec child) throws IOException { |
| |
| // check if it is a distributed merge sort |
| // If so, it does need to create a sort executor because |
| // the sort executor is created at the scan planning |
| if (child instanceof SortExec) { |
| SortExec childSortExec = (SortExec) child; |
| if (TUtil.checkEquals(sortNode.getSortKeys(), childSortExec.getSortSpecs())) { |
| return child; |
| } |
| } |
| |
| return new ExternalSortExec(context, sortNode, child); |
| } |
| |
| public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, |
| IndexScanNode annotation) |
| throws IOException { |
| //TODO-general Type Index |
| Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()), |
| "Error: There is no table matched to %s", annotation.getCanonicalName()); |
| |
| FragmentProto [] fragments = ctx.getTables(annotation.getTableName()); |
| Preconditions.checkState(fragments.length == 1); |
| return new BSTIndexScanExec(ctx, annotation, fragments[0], annotation.getIndexPath(), |
| annotation.getKeySchema(), annotation.getPredicates()); |
| } |
| |
| public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) { |
| if (enforcer == null) { |
| return null; |
| } |
| |
| EnforceType type; |
| if (node.getType() == NodeType.JOIN) { |
| type = EnforceType.JOIN; |
| } else if (node.getType() == NodeType.GROUP_BY) { |
| type = EnforceType.GROUP_BY; |
| } else if (node.getType() == NodeType.DISTINCT_GROUP_BY) { |
| type = EnforceType.DISTINCT_GROUP_BY; |
| } else if (node.getType() == NodeType.SORT) { |
| type = EnforceType.SORT; |
| } else if (node instanceof StoreTableNode |
| && ((StoreTableNode)node).hasPartition() |
| && ((StoreTableNode)node).getPartitionMethod().getPartitionType() == PartitionType.COLUMN) { |
| type = EnforceType.COLUMN_PARTITION; |
| } else { |
| return null; |
| } |
| |
| if (enforcer.hasEnforceProperty(type)) { |
| List<EnforceProperty> properties = enforcer.getEnforceProperties(type); |
| EnforceProperty found = null; |
| for (EnforceProperty property : properties) { |
| if (type == EnforceType.JOIN && property.getJoin().getNodeId() == node.getPID()) { |
| found = property; |
| } else if (type == EnforceType.GROUP_BY && property.getGroupby().getNodeId() == node.getPID()) { |
| found = property; |
| } else if (type == EnforceType.DISTINCT_GROUP_BY && property.getDistinct().getNodeId() == node.getPID()) { |
| found = property; |
| } else if (type == EnforceType.SORT && property.getSort().getNodeId() == node.getPID()) { |
| found = property; |
| } else if (type == EnforceType.COLUMN_PARTITION && property.getColumnPartition().getNodeId() == node.getPID()) { |
| found = property; |
| } |
| } |
| return found; |
| } else { |
| return null; |
| } |
| } |
| } |