blob: fce56fc36678d2ab5931f2d91b73e186aa329f1e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.ObjectArrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.SortSpecArray;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.serder.PlanProto.SortEnforce;
import org.apache.tajo.plan.serder.PlanProto.SortedInputEnforce;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.FileTablespace;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Stack;
import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
import static org.apache.tajo.plan.serder.PlanProto.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
import static org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType;
import static org.apache.tajo.plan.serder.PlanProto.GroupbyEnforce.GroupbyAlgorithm;
import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
public class PhysicalPlannerImpl implements PhysicalPlanner {
private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
private static final int UNGENERATED_PID = -1;
protected final TajoConf conf;
public PhysicalPlannerImpl(final TajoConf conf) {
this.conf = conf;
}
public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNode logicalPlan) {
PhysicalExec execPlan;
try {
execPlan = createPlanRecursive(context, logicalPlan, new Stack<>());
if (execPlan instanceof StoreTableExec
|| execPlan instanceof RangeShuffleFileWriteExec
|| execPlan instanceof HashShuffleFileWriteExec
|| execPlan instanceof ColPartitionStoreExec) {
return execPlan;
} else if (context.getDataChannel() != null) {
return buildOutputOperator(context, logicalPlan, execPlan);
} else {
return execPlan;
}
} catch (IOException ioe) {
throw new TajoInternalError(ioe);
}
}
private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
PhysicalExec execPlan) throws IOException {
DataChannel channel = context.getDataChannel();
ShuffleFileWriteNode shuffleFileWriteNode = LogicalPlan.createNodeWithoutPID(ShuffleFileWriteNode.class);
shuffleFileWriteNode.setDataFormat(context.getDataChannel().getDataFormat());
shuffleFileWriteNode.setInSchema(plan.getOutSchema());
shuffleFileWriteNode.setOutSchema(plan.getOutSchema());
shuffleFileWriteNode.setShuffle(channel.getShuffleType(), channel.getShuffleKeys(), channel.getShuffleOutputNum());
shuffleFileWriteNode.setChild(plan);
PhysicalExec outExecPlan = createShuffleFileWritePlan(context, shuffleFileWriteNode, execPlan);
return outExecPlan;
}
private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode, Stack<LogicalNode> stack)
throws IOException {
PhysicalExec leftExec;
PhysicalExec rightExec;
switch (logicalNode.getType()) {
case ROOT:
LogicalRootNode rootNode = (LogicalRootNode) logicalNode;
stack.push(rootNode);
leftExec = createPlanRecursive(ctx, rootNode.getChild(), stack);
stack.pop();
return leftExec;
case EXPRS:
EvalExprNode evalExpr = (EvalExprNode) logicalNode;
return new EvalExprExec(ctx, evalExpr);
case CREATE_TABLE:
case INSERT:
case STORE:
StoreTableNode storeNode = (StoreTableNode) logicalNode;
stack.push(storeNode);
leftExec = createPlanRecursive(ctx, storeNode.getChild(), stack);
stack.pop();
return createStorePlan(ctx, storeNode, leftExec);
case SELECTION:
SelectionNode selNode = (SelectionNode) logicalNode;
stack.push(selNode);
leftExec = createPlanRecursive(ctx, selNode.getChild(), stack);
stack.pop();
return new SelectionExec(ctx, selNode, leftExec);
case PROJECTION:
ProjectionNode prjNode = (ProjectionNode) logicalNode;
stack.push(prjNode);
leftExec = createPlanRecursive(ctx, prjNode.getChild(), stack);
stack.pop();
return new ProjectionExec(ctx, prjNode, leftExec);
case TABLE_SUBQUERY: {
TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode;
stack.push(subQueryNode);
leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery(), stack);
stack.pop();
return new ProjectionExec(ctx, subQueryNode, leftExec);
}
case PARTITIONS_SCAN:
case SCAN:
leftExec = createScanPlan(ctx, (ScanNode) logicalNode, stack);
return leftExec;
case GROUP_BY:
GroupbyNode grpNode = (GroupbyNode) logicalNode;
stack.push(grpNode);
leftExec = createPlanRecursive(ctx, grpNode.getChild(), stack);
stack.pop();
return createGroupByPlan(ctx, grpNode, leftExec);
case WINDOW_AGG:
WindowAggNode windowAggNode = (WindowAggNode) logicalNode;
stack.push(windowAggNode);
leftExec = createPlanRecursive(ctx, windowAggNode.getChild(), stack);
stack.pop();
return createWindowAgg(ctx, windowAggNode, leftExec);
case DISTINCT_GROUP_BY:
DistinctGroupbyNode distinctNode = (DistinctGroupbyNode) logicalNode;
stack.push(distinctNode);
leftExec = createPlanRecursive(ctx, distinctNode.getChild(), stack);
stack.pop();
return createDistinctGroupByPlan(ctx, distinctNode, leftExec);
case HAVING:
HavingNode havingNode = (HavingNode) logicalNode;
stack.push(havingNode);
leftExec = createPlanRecursive(ctx, havingNode.getChild(), stack);
stack.pop();
return new HavingExec(ctx, havingNode, leftExec);
case SORT:
SortNode sortNode = (SortNode) logicalNode;
stack.push(sortNode);
leftExec = createPlanRecursive(ctx, sortNode.getChild(), stack);
stack.pop();
return createSortPlan(ctx, sortNode, leftExec);
case JOIN:
JoinNode joinNode = (JoinNode) logicalNode;
stack.push(joinNode);
leftExec = createPlanRecursive(ctx, joinNode.getLeftChild(), stack);
rightExec = createPlanRecursive(ctx, joinNode.getRightChild(), stack);
stack.pop();
return createJoinPlan(ctx, joinNode, leftExec, rightExec);
case UNION:
UnionNode unionNode = (UnionNode) logicalNode;
stack.push(unionNode);
leftExec = createPlanRecursive(ctx, unionNode.getLeftChild(), stack);
rightExec = createPlanRecursive(ctx, unionNode.getRightChild(), stack);
stack.pop();
return new UnionExec(ctx, leftExec, rightExec);
case LIMIT:
LimitNode limitNode = (LimitNode) logicalNode;
stack.push(limitNode);
leftExec = createPlanRecursive(ctx, limitNode.getChild(), stack);
stack.pop();
return new LimitExec(ctx, limitNode.getInSchema(),
limitNode.getOutSchema(), leftExec, limitNode);
case INDEX_SCAN:
IndexScanNode indexScanNode = (IndexScanNode) logicalNode;
leftExec = createIndexScanExec(ctx, indexScanNode);
return leftExec;
case CREATE_INDEX:
CreateIndexNode createIndexNode = (CreateIndexNode) logicalNode;
stack.push(createIndexNode);
leftExec = createPlanRecursive(ctx, createIndexNode.getChild(), stack);
stack.pop();
return new StoreIndexExec(ctx, createIndexNode, leftExec);
default:
return null;
}
}
@VisibleForTesting
public static long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) {
long size = 0;
for (String tableId : tableIds) {
FragmentProto[] fragmentProtos = ctx.getTables(tableId);
List<Fragment> fragments = FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
for (Fragment frag : fragments) {
size += TablespaceManager.guessFragmentVolume(ctx.getConf(), frag);
}
}
return size;
}
private boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left)
throws IOException {
String [] lineage = PlannerUtil.getRelationLineage(node);
long volume = estimateSizeRecursive(context, lineage);
return checkIfInMemoryInnerJoinIsPossible(context, lineage, volume, left);
}
@VisibleForTesting
public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, String [] lineage, long tableVolume,
boolean left) throws IOException {
boolean inMemoryInnerJoinFlag;
QueryContext queryContext = context.getQueryContext();
if (queryContext.containsKey(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)) {
inMemoryInnerJoinFlag = tableVolume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)
* StorageUnit.MB;
} else {
inMemoryInnerJoinFlag = tableVolume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT)
* StorageUnit.MB;
}
LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.",
context.getTaskId().toString(),
(left ? "Left" : "Right"),
StringUtils.join(lineage),
FileUtil.humanReadableByteCount(tableVolume, false),
(inMemoryInnerJoinFlag ? "" : "not ")));
return inMemoryInnerJoinFlag;
}
public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
PhysicalExec rightExec) throws IOException {
switch (joinNode.getJoinType()) {
case CROSS:
return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
case INNER:
return createInnerJoinPlan(context, joinNode, leftExec, rightExec);
case LEFT_OUTER:
return createLeftOuterJoinPlan(context, joinNode, leftExec, rightExec);
case RIGHT_OUTER:
return createRightOuterJoinPlan(context, joinNode, leftExec, rightExec);
case FULL_OUTER:
return createFullOuterJoinPlan(context, joinNode, leftExec, rightExec);
case LEFT_SEMI:
return createLeftSemiJoinPlan(context, joinNode, leftExec, rightExec);
case RIGHT_SEMI:
return createRightSemiJoinPlan(context, joinNode, leftExec, rightExec);
case LEFT_ANTI:
return createLeftAntiJoinPlan(context, joinNode, leftExec, rightExec);
case RIGHT_ANTI:
return createRightAntiJoinPlan(context, joinNode, leftExec, rightExec);
default:
throw new PhysicalPlanningException("Cannot support join type: " + joinNode.getJoinType().name());
}
}
private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
default:
// fallback algorithm
LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
}
} else {
LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
// returns two PhysicalExec. smaller one is 0, and larger one is 1.
PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
}
}
private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
// returns two PhysicalExec. smaller one is 0, and larger one is 1.
PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
case MERGE_JOIN:
LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
return createMergeInnerJoin(context, plan, leftExec, rightExec);
case HYBRID_HASH_JOIN:
default:
LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
return createMergeInnerJoin(context, plan, leftExec, rightExec);
}
} else {
return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
}
}
/**
* It returns two {@link org.apache.tajo.engine.planner.physical.PhysicalExec}s sorted in an ascending order of
* their child relations' total volume. In other words, the smaller side is returned as 0's PhysicalExec, and
* the larger side is returned as 1's PhysicalExec.
*/
@VisibleForTesting
public PhysicalExec [] switchJoinSidesIfNecessary(TaskAttemptContext context, JoinNode plan,
PhysicalExec left, PhysicalExec right) throws IOException {
String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
long leftSize = estimateSizeRecursive(context, leftLineage);
long rightSize = estimateSizeRecursive(context, rightLineage);
PhysicalExec smaller;
PhysicalExec larger;
if (leftSize <= rightSize) {
smaller = left;
larger = right;
LOG.info(String.format("[%s] Left relations %s (%s) is smaller than Right relations %s (%s).",
context.getTaskId().toString(),
StringUtils.join(leftLineage),
FileUtil.humanReadableByteCount(leftSize, false),
StringUtils.join(rightLineage),
FileUtil.humanReadableByteCount(rightSize, false)));
} else {
smaller = right;
larger = left;
LOG.info(String.format("[%s] Right relations %s (%s) is smaller than Left relations %s (%s).",
context.getTaskId().toString(),
StringUtils.join(rightLineage),
FileUtil.humanReadableByteCount(rightSize, false),
StringUtils.join(leftLineage),
FileUtil.humanReadableByteCount(leftSize, false)));
}
return new PhysicalExec [] {smaller, larger};
}
private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
boolean inMemoryHashJoin = false;
if (checkIfInMemoryInnerJoinIsPossible(context, plan.getLeftChild(), true)
|| checkIfInMemoryInnerJoinIsPossible(context, plan.getRightChild(), false)) {
inMemoryHashJoin = true;
}
if (inMemoryHashJoin) {
LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
// returns two PhysicalExec. smaller one is 0, and larger one is 1.
PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
} else {
return createMergeInnerJoin(context, plan, leftExec, rightExec);
}
}
private MergeJoinExec createMergeInnerJoin(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
SortNode leftSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
leftSortNode.setSortSpecs(sortSpecs[0]);
leftSortNode.setInSchema(leftExec.getSchema());
leftSortNode.setOutSchema(leftExec.getSchema());
ExternalSortExec outerSort = new ExternalSortExec(context, leftSortNode, leftExec);
SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
rightSortNode.setSortSpecs(sortSpecs[1]);
rightSortNode.setInSchema(rightExec.getSchema());
rightSortNode.setOutSchema(rightExec.getSchema());
ExternalSortExec innerSort = new ExternalSortExec(context, rightSortNode, rightExec);
LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]");
return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
}
private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
case MERGE_JOIN:
//the right operand is too large, so we opt for merge join implementation
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Merge Join].");
return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec);
default:
LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN);
return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec);
}
} else {
return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
}
}
private static boolean isHashOuterJoinFeasible(TaskAttemptContext context, LogicalNode innerRelation) {
String [] rightLineage = PlannerUtil.getRelationLineage(innerRelation);
long estimatedVolume = estimateSizeRecursive(context, rightLineage);
QueryContext queryContext = context.getQueryContext();
if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
return estimatedVolume <= queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB;
} else {
return estimatedVolume <= queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT) * StorageUnit.MB;
}
}
private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
if (isHashOuterJoinFeasible(context, plan.getRightChild())) {
// we can implement left outer join using hash join, using the right operand as the build relation
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
}
else {
//the right operand is too large, so we opt for merge join implementation
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Merge Join].");
return createRightOuterMergeJoinPlan(context, plan, rightExec, leftExec);
}
}
private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
//if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
// blocking, but merge join is blocking as well)
if (isHashOuterJoinFeasible(context, plan.getLeftChild())){
LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
} else {
return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
}
}
private PhysicalExec createRightOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
//the left operand is too large, so opt for merge join implementation
LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Merge Join].");
SortSpec[][] sortSpecs2 = PlannerUtil.getSortKeysFromJoinQual(
plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
SortNode leftSortNode2 = LogicalPlan.createNodeWithoutPID(SortNode.class);
leftSortNode2.setSortSpecs(sortSpecs2[0]);
leftSortNode2.setInSchema(leftExec.getSchema());
leftSortNode2.setOutSchema(leftExec.getSchema());
ExternalSortExec outerSort2 = new ExternalSortExec(context, leftSortNode2, leftExec);
SortNode rightSortNode2 = LogicalPlan.createNodeWithoutPID(SortNode.class);
rightSortNode2.setSortSpecs(sortSpecs2[1]);
rightSortNode2.setInSchema(rightExec.getSchema());
rightSortNode2.setOutSchema(rightExec.getSchema());
ExternalSortExec innerSort2 = new ExternalSortExec(context, rightSortNode2, rightExec);
return new RightOuterMergeJoinExec(context, plan, outerSort2, innerSort2, sortSpecs2[0], sortSpecs2[1]);
}
private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
case MERGE_JOIN:
return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
default:
LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN);
return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
}
} else {
return createBestRightJoinPlan(context, plan, leftExec, rightExec);
}
}
private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
case MERGE_JOIN:
return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
default:
LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback to join algorithm: " + JoinAlgorithm.MERGE_JOIN);
return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
}
} else {
return createBestFullOuterJoinPlan(context, plan, leftExec, rightExec);
}
}
private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec)
throws IOException {
String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
long outerSize2 = estimateSizeRecursive(context, leftLineage);
long innerSize2 = estimateSizeRecursive(context, rightLineage);
PhysicalExec selectedRight;
PhysicalExec selectedLeft;
// HashJoinExec loads the smaller relation to memory.
if (outerSize2 <= innerSize2) {
selectedLeft = leftExec;
selectedRight = rightExec;
} else {
selectedLeft = rightExec;
selectedRight = leftExec;
}
LOG.info("Full Outer Join (" + plan.getPID() + ") chooses [Hash Join]");
return new HashFullOuterJoinExec(context, plan, selectedRight, selectedLeft);
}
private MergeFullOuterJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec)
throws IOException {
// if size too large, full outer merge join implementation
LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Merge Join]");
SortSpec[][] sortSpecs3 = PlannerUtil.getSortKeysFromJoinQual(plan.getJoinQual(),
leftExec.getSchema(), rightExec.getSchema());
SortNode leftSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
leftSortNode.setSortSpecs(sortSpecs3[0]);
leftSortNode.setInSchema(leftExec.getSchema());
leftSortNode.setOutSchema(leftExec.getSchema());
ExternalSortExec outerSort3 = new ExternalSortExec(context, leftSortNode, leftExec);
SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
rightSortNode.setSortSpecs(sortSpecs3[1]);
rightSortNode.setInSchema(rightExec.getSchema());
rightSortNode.setOutSchema(rightExec.getSchema());
ExternalSortExec innerSort3 = new ExternalSortExec(context, rightSortNode, rightExec);
return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
}
private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
// The inner relation is always expected to be smaller than the outer relation.
// (See GreedyHeuristicJoinOrderAlgorithm:::swapLeftAndRightIfNecessary().
// Thus, we need to evaluate only that the right table is able to be loaded or not.
if (isHashOuterJoinFeasible(context, plan.getRightChild())) {
return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
} else {
return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
}
}
/**
* Left semi join means that the left side is the IN side table, and the right side is the FROM side table.
*/
private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
default:
// TODO: implement sort-based semi join operator
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
}
} else {
LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
}
}
/**
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
*/
private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
default:
// TODO: implement sort-based semi join operator
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
}
} else {
LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
}
}
/**
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
*/
private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
default:
// TODO: implement sort-based anti join operator
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
}
} else {
LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
}
}
/**
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
*/
private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
default:
// TODO: implement sort-based anti join operator
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
}
} else {
LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
}
}
/**
* Create a shuffle file write executor to store intermediate data into local disks.
*/
public PhysicalExec createShuffleFileWritePlan(TaskAttemptContext ctx,
ShuffleFileWriteNode plan, PhysicalExec subOp) throws IOException {
plan.getOptions().set(StorageConstants.SHUFFLE_TYPE,
PlannerUtil.getShuffleType(ctx.getDataChannel().getShuffleType()));
switch (plan.getShuffleType()) {
case HASH_SHUFFLE:
case SCATTERED_HASH_SHUFFLE:
return new HashShuffleFileWriteExec(ctx, plan, subOp);
case RANGE_SHUFFLE:
SortExec sortExec = PhysicalPlanUtil.findExecutor(subOp, SortExec.class);
SortSpec [] sortSpecs = null;
if (sortExec != null) {
sortSpecs = sortExec.getSortSpecs();
} else {
Column[] columns = ctx.getDataChannel().getShuffleKeys();
SortSpec specs[] = new SortSpec[columns.length];
for (int i = 0; i < columns.length; i++) {
specs[i] = new SortSpec(columns[i]);
}
}
return new RangeShuffleFileWriteExec(ctx, plan, subOp, sortSpecs);
case NONE_SHUFFLE:
// if there is no given NULL CHAR property in the table property and the query is neither CTAS or INSERT,
// we set DEFAULT NULL CHAR to the table property.
if (!ctx.getQueryContext().containsKey(SessionVars.NULL_CHAR)) {
plan.getOptions().set(StorageConstants.TEXT_NULL, TajoConf.ConfVars.$TEXT_NULL.defaultVal);
}
return new StoreTableExec(ctx, plan, subOp);
default:
throw new IllegalStateException(ctx.getDataChannel().getShuffleType() + " is not supported yet.");
}
}
/**
* Create a executor to store a table into HDFS. This is used for CREATE TABLE ..
* AS or INSERT (OVERWRITE) INTO statement.
*/
public PhysicalExec createStorePlan(TaskAttemptContext ctx,
StoreTableNode plan, PhysicalExec subOp) throws IOException {
if (plan.getPartitionMethod() != null) {
switch (plan.getPartitionMethod().getPartitionType()) {
case COLUMN:
return createColumnPartitionStorePlan(ctx, plan, subOp);
default:
throw new IllegalStateException(plan.getPartitionMethod().getPartitionType() + " is not supported yet.");
}
} else {
return new StoreTableExec(ctx, plan, subOp);
}
}
private PhysicalExec createColumnPartitionStorePlan(TaskAttemptContext context,
StoreTableNode storeTableNode,
PhysicalExec child) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, storeTableNode);
if (property != null) {
ColumnPartitionAlgorithm algorithm = property.getColumnPartition().getAlgorithm();
switch (algorithm) {
case HASH_PARTITION:
return createHashColumnPartitionStorePlan(context, storeTableNode, child);
case SORT_PARTITION: // default algorithm
default:
return createSortBasedColumnPartitionStorePlan(context, storeTableNode, child);
}
} else { // default algorithm is sorted-based column partition
return createSortBasedColumnPartitionStorePlan(context, storeTableNode, child);
}
}
private PhysicalExec createHashColumnPartitionStorePlan(TaskAttemptContext context,
StoreTableNode storeTableNode,
PhysicalExec child) throws IOException {
LOG.info("The planner chooses [Hash-based Column Partitioned Store] algorithm");
return new HashBasedColPartitionStoreExec(context, storeTableNode, child);
}
private PhysicalExec createSortBasedColumnPartitionStorePlan(TaskAttemptContext context,
StoreTableNode storeTableNode,
PhysicalExec child) throws IOException {
Column[] partitionKeyColumns = storeTableNode.getPartitionMethod().getExpressionSchema().toArray();
SortSpec[] sortSpecs = new SortSpec[partitionKeyColumns.length];
if (storeTableNode.getType() == NodeType.INSERT) {
InsertNode insertNode = (InsertNode) storeTableNode;
for (int i = 0; i < partitionKeyColumns.length; i++) {
for (Column column : partitionKeyColumns) {
int id = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
sortSpecs[i++] = new SortSpec(insertNode.getProjectedSchema().getColumn(id), true, false);
}
}
} else if (storeTableNode.getType() == NodeType.CREATE_TABLE) {
int i = 0;
for (int j = 0; j < partitionKeyColumns.length; j++) {
int id = storeTableNode.getOutSchema().getRootColumns().size() + j;
Column column = storeTableNode.getInSchema().getColumn(id);
sortSpecs[i++] = new SortSpec(column, true, false);
}
} else {
for (int i = 0; i < partitionKeyColumns.length; i++) {
sortSpecs[i] = new SortSpec(partitionKeyColumns[i], true, false);
}
}
SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
sortNode.setSortSpecs(sortSpecs);
sortNode.setInSchema(child.getSchema());
sortNode.setOutSchema(child.getSchema());
ExternalSortExec sortExec = new ExternalSortExec(context, sortNode, child);
LOG.info("The planner chooses [Sort-based Column Partitioned Store] algorithm");
return new SortBasedColPartitionStoreExec(context, storeTableNode, sortExec);
}
private boolean checkIfSortEquivalance(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node) {
Enforcer enforcer = ctx.getEnforcer();
List<EnforceProperty> property = enforcer.getEnforceProperties(EnforceType.SORTED_INPUT);
if (property != null && property.size() > 0 && node.peek().getType() == NodeType.SORT) {
SortNode sortNode = (SortNode) node.peek();
SortedInputEnforce sortEnforcer = property.get(0).getSortedInput();
boolean condition = scanNode.getTableName().equals(sortEnforcer.getTableName());
SortSpec [] sortSpecs = LogicalNodeDeserializer.convertSortSpecs(sortEnforcer.getSortSpecsList());
return condition && TUtil.checkEquals(sortNode.getSortKeys(), sortSpecs);
} else {
return false;
}
}
public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node)
throws IOException {
// check if an input is sorted in the same order to the subsequence sort operator.
if (checkIfSortEquivalance(ctx, scanNode, node)) {
if (ctx.getTable(scanNode.getCanonicalName()) == null) {
return new SeqScanExec(ctx, scanNode, null);
}
FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
return new ExternalSortExec(ctx, (SortNode) node.peek(), scanNode, fragments);
} else {
Enforcer enforcer = ctx.getEnforcer();
// check if this table is broadcasted one or not.
boolean broadcastFlag = false;
if (enforcer != null && enforcer.hasEnforceProperty(EnforceType.BROADCAST)) {
List<EnforceProperty> properties = enforcer.getEnforceProperties(EnforceType.BROADCAST);
for (EnforceProperty property : properties) {
broadcastFlag |= scanNode.getCanonicalName().equals(property.getBroadcast().getTableName());
}
}
if (scanNode instanceof PartitionedTableScanNode
&& ((PartitionedTableScanNode)scanNode).getInputPaths() != null &&
((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) {
if (broadcastFlag) {
PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
List<Fragment> fileFragments = new ArrayList<>();
FileTablespace space = TablespaceManager.get(scanNode.getTableDesc().getUri());
for (Path path : partitionedTableScanNode.getInputPaths()) {
fileFragments.addAll(Arrays.asList(space.split(scanNode.getCanonicalName(), path)));
}
FragmentProto[] fragments =
FragmentConvertor.toFragmentProtoArray(conf, fileFragments.toArray(new Fragment[fileFragments.size()]));
ctx.addFragments(scanNode.getCanonicalName(), fragments);
return new PartitionMergeScanExec(ctx, scanNode, fragments);
}
}
if (ctx.getTable(scanNode.getCanonicalName()) == null) {
return new SeqScanExec(ctx, scanNode, null);
}
FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
return new SeqScanExec(ctx, scanNode, fragments);
}
}
public PhysicalExec createGroupByPlan(TaskAttemptContext context,GroupbyNode groupbyNode, PhysicalExec subOp)
throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, groupbyNode);
if (property != null) {
GroupbyAlgorithm algorithm = property.getGroupby().getAlgorithm();
if (algorithm == GroupbyAlgorithm.HASH_AGGREGATION) {
return createInMemoryHashAggregation(context, groupbyNode, subOp);
} else {
return createSortAggregation(context, property, groupbyNode, subOp);
}
}
return createBestAggregationPlan(context, groupbyNode, subOp);
}
private PhysicalExec createInMemoryHashAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp)
throws IOException {
LOG.info("The planner chooses [Hash Aggregation]");
return new HashAggregateExec(ctx, groupbyNode, subOp);
}
private PhysicalExec createSortAggregation(TaskAttemptContext ctx, EnforceProperty property, GroupbyNode groupbyNode,
PhysicalExec subOp) throws IOException {
Column[] grpColumns = groupbyNode.getGroupingColumns();
SortSpec[] sortSpecs = new SortSpec[grpColumns.length];
for (int i = 0; i < grpColumns.length; i++) {
sortSpecs[i] = new SortSpec(grpColumns[i], true, false);
}
if (property != null) {
List<CatalogProtos.SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList();
List<SortSpec> enforcedSortSpecList = Lists.newArrayList();
int i = 0;
outer:
for (SortSpecProto sortSpecProto : sortSpecProtos) {
SortSpec enforcedSortSpecs = new SortSpec(sortSpecProto);
for (Column grpKey : grpColumns) { // if this sort key is included in grouping columns, skip it.
if (enforcedSortSpecs.getSortKey().equals(grpKey)) {
continue outer;
}
}
enforcedSortSpecList.add(enforcedSortSpecs);
}
sortSpecs = ObjectArrays.concat(sortSpecs, TUtil.toArray(enforcedSortSpecList, SortSpec.class), SortSpec.class);
}
SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
sortNode.setSortSpecs(sortSpecs);
sortNode.setInSchema(subOp.getSchema());
sortNode.setOutSchema(subOp.getSchema());
ExternalSortExec sortExec = new ExternalSortExec(ctx, sortNode, subOp);
LOG.info("The planner chooses [Sort Aggregation] in (" + StringUtils.join(sortSpecs) + ")");
return new SortAggregateExec(ctx, groupbyNode, sortExec);
}
private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, GroupbyNode groupbyNode,
PhysicalExec subOp) throws IOException {
Column[] grpColumns = groupbyNode.getGroupingColumns();
if (grpColumns.length == 0) {
return createInMemoryHashAggregation(context, groupbyNode, subOp);
}
String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
long estimatedSize = estimateSizeRecursive(context, outerLineage);
final long threshold = context.getQueryContext().getLong(SessionVars.HASH_GROUPBY_SIZE_LIMIT) * StorageUnit.MB;
// if the relation size is less than the threshold,
// the hash aggregation will be used.
LOG.info("Aggregation:estimatedSize=" + estimatedSize + ", threshold=" + threshold);
if (estimatedSize <= threshold) {
LOG.info("The planner chooses [Hash Aggregation]");
return createInMemoryHashAggregation(context, groupbyNode, subOp);
} else {
return createSortAggregation(context, null, groupbyNode, subOp);
}
}
public PhysicalExec createWindowAgg(TaskAttemptContext context,WindowAggNode windowAggNode, PhysicalExec subOp)
throws IOException {
PhysicalExec child = subOp;
if (windowAggNode.hasPartitionKeys()) {
Column[] grpColumns = windowAggNode.getPartitionKeys();
SortSpec[] sortSpecs = new SortSpec[grpColumns.length];
for (int i = 0; i < grpColumns.length; i++) {
sortSpecs[i] = new SortSpec(grpColumns[i], true, false);
}
SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
sortNode.setSortSpecs(sortSpecs);
sortNode.setInSchema(subOp.getSchema());
sortNode.setOutSchema(subOp.getSchema());
child = new ExternalSortExec(context, sortNode, subOp);
LOG.info("The planner chooses [Sort Aggregation] in (" + StringUtils.join(sortSpecs) + ")");
}
return new WindowAggExec(context, windowAggNode, child);
}
public PhysicalExec createDistinctGroupByPlan(TaskAttemptContext context,
DistinctGroupbyNode distinctNode, PhysicalExec subOp)
throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, distinctNode);
if (property != null) {
if (property.getDistinct().getIsMultipleAggregation()) {
MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage();
if (stage == MultipleAggregationStage.FIRST_STAGE) {
return new DistinctGroupbyFirstAggregationExec(context, distinctNode, subOp);
} else if (stage == MultipleAggregationStage.SECOND_STAGE) {
return new DistinctGroupbySecondAggregationExec(context, distinctNode,
createSortExecForDistinctGroupby(context, distinctNode, subOp, 2));
} else {
return new DistinctGroupbyThirdAggregationExec(context, distinctNode,
createSortExecForDistinctGroupby(context, distinctNode, subOp, 3));
}
} else {
DistinctAggregationAlgorithm algorithm = property.getDistinct().getAlgorithm();
if (algorithm == DistinctAggregationAlgorithm.HASH_AGGREGATION) {
return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
} else {
return createSortAggregationDistinctGroupbyExec(context, distinctNode, subOp, property.getDistinct());
}
}
} else {
return createInMemoryDistinctGroupbyExec(context, distinctNode, subOp);
}
}
private SortExec createSortExecForDistinctGroupby(TaskAttemptContext context,
DistinctGroupbyNode distinctNode,
PhysicalExec subOp,
int phase) throws IOException {
SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
//2 phase: seq, groupby columns, distinct1 keys, distinct2 keys,
//3 phase: groupby columns, seq, distinct1 keys, distinct2 keys,
List<SortSpec> sortSpecs = new ArrayList<>();
if (phase == 2) {
sortSpecs.add(new SortSpec(distinctNode.getTargets().get(0).getNamedColumn()));
}
for (Column eachColumn: distinctNode.getGroupingColumns()) {
sortSpecs.add(new SortSpec(eachColumn));
}
if (phase == 3) {
sortSpecs.add(new SortSpec(distinctNode.getTargets().get(0).getNamedColumn()));
}
for (GroupbyNode eachGroupbyNode: distinctNode.getSubPlans()) {
for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) {
sortSpecs.add(new SortSpec(eachColumn));
}
}
sortNode.setSortSpecs(sortSpecs.toArray(new SortSpec[sortSpecs.size()]));
sortNode.setInSchema(distinctNode.getInSchema());
sortNode.setOutSchema(distinctNode.getInSchema());
ExternalSortExec sortExec = new ExternalSortExec(context, sortNode, subOp);
return sortExec;
}
private PhysicalExec createInMemoryDistinctGroupbyExec(TaskAttemptContext ctx,
DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp) throws IOException {
return new DistinctGroupbyHashAggregationExec(ctx, distinctGroupbyNode, subOp);
}
private PhysicalExec createSortAggregationDistinctGroupbyExec(TaskAttemptContext ctx,
DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp,
DistinctGroupbyEnforcer enforcer) throws IOException {
List<GroupbyNode> groupbyNodes = distinctGroupbyNode.getSubPlans();
SortAggregateExec[] sortAggregateExec = new SortAggregateExec[groupbyNodes.size()];
List<SortSpecArray> sortSpecArrays = enforcer.getSortSpecArraysList();
int index = 0;
for (GroupbyNode eachGroupbyNode: groupbyNodes) {
SortSpecArray sortSpecArray = sortSpecArrays.get(index);
SortSpec[] sortSpecs = new SortSpec[sortSpecArray.getSortSpecsList().size()];
int sortIndex = 0;
for (SortSpecProto eachProto: sortSpecArray.getSortSpecsList()) {
sortSpecs[sortIndex++] = new SortSpec(eachProto);
}
SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
sortNode.setSortSpecs(sortSpecs);
sortNode.setInSchema(subOp.getSchema());
sortNode.setOutSchema(eachGroupbyNode.getInSchema());
ExternalSortExec sortExec = new ExternalSortExec(ctx, sortNode, subOp);
sortAggregateExec[index++] = new SortAggregateExec(ctx, eachGroupbyNode, sortExec);
}
return new DistinctGroupbySortAggregationExec(ctx, distinctGroupbyNode, sortAggregateExec);
}
public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode,
PhysicalExec child) throws IOException {
// check if it is a distributed merge sort
// If so, it does need to create a sort executor because
// the sort executor is created at the scan planning
if (child instanceof SortExec) {
SortExec childSortExec = (SortExec) child;
if (TUtil.checkEquals(sortNode.getSortKeys(), childSortExec.getSortSpecs())) {
return child;
}
}
return new ExternalSortExec(context, sortNode, child);
}
public PhysicalExec createIndexScanExec(TaskAttemptContext ctx,
IndexScanNode annotation)
throws IOException {
//TODO-general Type Index
Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()),
"Error: There is no table matched to %s", annotation.getCanonicalName());
FragmentProto [] fragments = ctx.getTables(annotation.getTableName());
Preconditions.checkState(fragments.length == 1);
return new BSTIndexScanExec(ctx, annotation, fragments[0], annotation.getIndexPath(),
annotation.getKeySchema(), annotation.getPredicates());
}
public static EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {
if (enforcer == null) {
return null;
}
EnforceType type;
if (node.getType() == NodeType.JOIN) {
type = EnforceType.JOIN;
} else if (node.getType() == NodeType.GROUP_BY) {
type = EnforceType.GROUP_BY;
} else if (node.getType() == NodeType.DISTINCT_GROUP_BY) {
type = EnforceType.DISTINCT_GROUP_BY;
} else if (node.getType() == NodeType.SORT) {
type = EnforceType.SORT;
} else if (node instanceof StoreTableNode
&& ((StoreTableNode)node).hasPartition()
&& ((StoreTableNode)node).getPartitionMethod().getPartitionType() == PartitionType.COLUMN) {
type = EnforceType.COLUMN_PARTITION;
} else {
return null;
}
if (enforcer.hasEnforceProperty(type)) {
List<EnforceProperty> properties = enforcer.getEnforceProperties(type);
EnforceProperty found = null;
for (EnforceProperty property : properties) {
if (type == EnforceType.JOIN && property.getJoin().getNodeId() == node.getPID()) {
found = property;
} else if (type == EnforceType.GROUP_BY && property.getGroupby().getNodeId() == node.getPID()) {
found = property;
} else if (type == EnforceType.DISTINCT_GROUP_BY && property.getDistinct().getNodeId() == node.getPID()) {
found = property;
} else if (type == EnforceType.SORT && property.getSort().getNodeId() == node.getPID()) {
found = property;
} else if (type == EnforceType.COLUMN_PARTITION && property.getColumnPartition().getNodeId() == node.getPID()) {
found = property;
}
}
return found;
} else {
return null;
}
}
}