blob: 1e4fba9a1ff82a586da006982152f591b0a90c61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ObjectArrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.EdgeType;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.exception.InternalException;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
import org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
import org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
import org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
import org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.IndexUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class PhysicalPlannerImpl implements PhysicalPlanner {
private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
private static final int UNGENERATED_PID = -1;
protected final TajoConf conf;
protected final AbstractStorageManager sm;
public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
this.conf = conf;
this.sm = sm;
}
public PhysicalExec createPlan(final TaskAttemptContext context, ExecutionPlan plan)
throws InternalException {
PhysicalExec execPlan;
try {
plan = checkOutputOperator(context, plan);
execPlan = createPlanRecursive(context, plan, plan.getTerminalNode());
return execPlan;
//=======
// execPlan = createPlanRecursive(context, logicalPlan);
// if (execPlan instanceof StoreTableExec
// || execPlan instanceof IndexedStoreExec
// || execPlan instanceof PartitionedStoreExec
// || execPlan instanceof ColumnPartitionedTableStoreExec) {
// return execPlan;
// } else if (context.getDataChannel() != null) {
// return buildOutputOperator(context, logicalPlan, execPlan);
// } else {
// return execPlan;
// }
//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
} catch (IOException ioe) {
throw new InternalException(ioe);
}
}
@VisibleForTesting
public PhysicalExec createPlanWithoutMaterialize(final TaskAttemptContext context, ExecutionPlan plan)
throws InternalException {
PhysicalExec execPlan;
try {
execPlan = createPlanRecursive(context, plan, plan.getTerminalNode());
return execPlan;
} catch (IOException ioe) {
throw new InternalException(ioe);
}
}
//=======
// private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
// PhysicalExec execPlan) throws IOException {
// DataChannel channel = context.getDataChannel();
// StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
// if(context.isInterQuery()) storeTableNode.setStorageType(context.getDataChannel().getStoreType());
// storeTableNode.setInSchema(plan.getOutSchema());
// storeTableNode.setOutSchema(plan.getOutSchema());
// if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
// storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
// } else {
// storeTableNode.setDefaultParition();
//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
// }
// }
private ExecutionPlan checkOutputOperator(TaskAttemptContext context, ExecutionPlan plan) {
LogicalNode root = plan.getTerminalNode();
List<DataChannel> channels = context.getOutgoingChannels();
for (DataChannel channel : channels) {
LogicalNode node = plan.getPlanGroupWithPID(channel.getSrcPID()).getRootNode();
if (node.getType() != NodeType.STORE) {
StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
storeTableNode.setInSchema(channel.getSchema());
storeTableNode.setOutSchema(channel.getSchema());
if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
} else {
storeTableNode.setDefaultParition();
}
LogicalNode topNode = plan.getFirstPlanGroup().toLinkedLogicalNode();
storeTableNode.setChild(topNode);
plan.setPlan(storeTableNode);
// plan.build();
channel.updateSrcPID(storeTableNode.getPID());
}
}
return plan;
}
private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, ExecutionPlan plan, LogicalNode logicalNode) throws IOException {
PhysicalExec leftExec;
PhysicalExec rightExec;
PhysicalExec currentExec;
switch (logicalNode.getType()) {
case ROOT:
LogicalRootNode rootNode = (LogicalRootNode) logicalNode;
List<PhysicalExec> childExecs = new ArrayList<PhysicalExec>();
for (LogicalNode child : plan.getChilds(rootNode)) {
childExecs.add(createPlanRecursive(ctx, plan, child));
}
return new PhysicalRootExec(ctx, childExecs);
case EXPRS:
EvalExprNode evalExpr = (EvalExprNode) logicalNode;
return new EvalExprExec(ctx, evalExpr);
case STORE:
StoreTableNode storeNode = (StoreTableNode) logicalNode;
leftExec = createPlanRecursive(ctx, plan, plan.getChilds(storeNode).get(0));
return createStorePlan(ctx, storeNode, leftExec);
case SELECTION:
SelectionNode selNode = (SelectionNode) logicalNode;
leftExec = createPlanRecursive(ctx, plan, plan.getChilds(selNode).get(0));
currentExec = new SelectionExec(ctx, selNode, leftExec);
if (plan.getParentCount(logicalNode) > 1) {
return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
} else {
return currentExec;
}
case PROJECTION:
ProjectionNode prjNode = (ProjectionNode) logicalNode;
leftExec = createPlanRecursive(ctx, plan, plan.getChilds(prjNode).get(0));
currentExec = new ProjectionExec(ctx, prjNode, leftExec);
if (plan.getParentCount(logicalNode) > 1) {
return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
} else {
return currentExec;
}
case TABLE_SUBQUERY: {
TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode;
leftExec = createPlanRecursive(ctx, plan, plan.getChild(subQueryNode, 0));
if (plan.getParentCount(logicalNode) > 1) {
return new MultiOutputExec(ctx, leftExec.getSchema(), leftExec, plan.getParentCount(logicalNode));
} else {
return leftExec;
}
}
case PARTITIONS_SCAN:
case SCAN:
leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
if (plan.getParentCount(logicalNode) > 1) {
return new MultiOutputExec(ctx, leftExec.getSchema(), leftExec, plan.getParentCount(logicalNode));
} else {
return leftExec;
}
case GROUP_BY:
GroupbyNode grpNode = (GroupbyNode) logicalNode;
leftExec = createPlanRecursive(ctx, plan, plan.getChilds(grpNode).get(0));
currentExec = createGroupByPlan(ctx, plan, grpNode, leftExec);
if (plan.getParentCount(logicalNode) > 1) {
return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
} else {
return currentExec;
}
case SORT:
SortNode sortNode = (SortNode) logicalNode;
leftExec = createPlanRecursive(ctx, plan, plan.getChilds(sortNode).get(0));
currentExec = createSortPlan(ctx, plan, sortNode, leftExec);
if (plan.getParentCount(logicalNode) > 1) {
return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
} else {
return currentExec;
}
case JOIN:
JoinNode joinNode = (JoinNode) logicalNode;
List<LogicalNode> childs = plan.getChilds(joinNode);
leftExec = createPlanRecursive(ctx, plan, childs.get(0));
rightExec = createPlanRecursive(ctx, plan, childs.get(1));
currentExec = createJoinPlan(ctx, plan, joinNode, leftExec, rightExec);
if (plan.getParentCount(logicalNode) > 1) {
return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
} else {
return currentExec;
}
case UNION:
UnionNode unionNode = (UnionNode) logicalNode;
childs = plan.getChilds(unionNode);
leftExec = createPlanRecursive(ctx, plan, childs.get(0));
rightExec = createPlanRecursive(ctx, plan, childs.get(1));
currentExec = new UnionExec(ctx, leftExec, rightExec);
if (plan.getParentCount(logicalNode) > 1) {
return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
} else {
return currentExec;
}
case LIMIT:
LimitNode limitNode = (LimitNode) logicalNode;
leftExec = createPlanRecursive(ctx, plan, plan.getChilds(limitNode).get(0));
currentExec = new LimitExec(ctx, limitNode.getInSchema(),
limitNode.getOutSchema(), leftExec, limitNode);
if (plan.getParentCount(logicalNode) > 1) {
return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
} else {
return currentExec;
}
case BST_INDEX_SCAN:
IndexScanNode indexScanNode = (IndexScanNode) logicalNode;
currentExec = createIndexScanExec(ctx, indexScanNode);
if (plan.getParentCount(logicalNode) > 1) {
return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
} else {
return currentExec;
}
default:
return null;
}
}
private long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) throws IOException {
long size = 0;
for (String tableId : tableIds) {
// TODO - CSV is a hack.
List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV,
ctx.getTables(tableId));
for (FileFragment frag : fragments) {
size += frag.getEndKey();
}
}
return size;
}
public PhysicalExec createJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode joinNode,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
switch (joinNode.getJoinType()) {
case CROSS:
return createCrossJoinPlan(context, plan, joinNode, leftExec, rightExec);
case INNER:
return createInnerJoinPlan(context, plan, joinNode, leftExec, rightExec);
case LEFT_OUTER:
return createLeftOuterJoinPlan(context, plan, joinNode, leftExec, rightExec);
case RIGHT_OUTER:
return createRightOuterJoinPlan(context, plan, joinNode, leftExec, rightExec);
case FULL_OUTER:
return createFullOuterJoinPlan(context, plan, joinNode, leftExec, rightExec);
case LEFT_SEMI:
return createLeftSemiJoinPlan(context, plan, joinNode, leftExec, rightExec);
case RIGHT_SEMI:
return createRightSemiJoinPlan(context, plan, joinNode, leftExec, rightExec);
case LEFT_ANTI:
return createLeftAntiJoinPlan(context, plan, joinNode, leftExec, rightExec);
case RIGHT_ANTI:
return createRightAntiJoinPlan(context, plan, joinNode, leftExec, rightExec);
default:
throw new PhysicalPlanningException("Cannot support join type: " + joinNode.getJoinType().name());
}
}
private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case NESTED_LOOP_JOIN:
LOG.info("Join (" + join.getPID() +") chooses [Nested Loop Join]");
return new NLJoinExec(context, join, leftExec, rightExec);
case BLOCK_NESTED_LOOP_JOIN:
LOG.info("Join (" + join.getPID() +") chooses [Block Nested Loop Join]");
return new BNLJoinExec(context, join, leftExec, rightExec);
default:
// fallback algorithm
LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
return new BNLJoinExec(context, join, leftExec, rightExec);
}
} else {
return new BNLJoinExec(context, join, leftExec, rightExec);
}
}
private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode node,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, node);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case NESTED_LOOP_JOIN:
LOG.info("Join (" + node.getPID() +") chooses [Nested Loop Join]");
return new NLJoinExec(context, node, leftExec, rightExec);
case BLOCK_NESTED_LOOP_JOIN:
LOG.info("Join (" + node.getPID() +") chooses [Block Nested Loop Join]");
return new BNLJoinExec(context, node, leftExec, rightExec);
case IN_MEMORY_HASH_JOIN:
LOG.info("Join (" + node.getPID() +") chooses [In-memory Hash Join]");
return new HashJoinExec(context, node, leftExec, rightExec);
case MERGE_JOIN:
LOG.info("Join (" + node.getPID() +") chooses [Sort Merge Join]");
return createMergeInnerJoin(context, node, leftExec, rightExec);
case HYBRID_HASH_JOIN:
default:
LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
return createMergeInnerJoin(context, node, leftExec, rightExec);
}
} else {
return createBestInnerJoinPlan(context, plan, node, leftExec, rightExec);
}
}
private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode node,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
List<LogicalNode> childs = plan.getChilds(node);
String [] leftLineage = PlannerUtil.getRelationLineage(plan, childs.get(0));
String [] rightLineage = PlannerUtil.getRelationLineage(plan, childs.get(1));
//=======
// String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
// String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long leftSize = estimateSizeRecursive(context, leftLineage);
long rightSize = estimateSizeRecursive(context, rightLineage);
final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD);
boolean hashJoin = false;
if (leftSize < threshold || rightSize < threshold) {
hashJoin = true;
}
if (hashJoin) {
PhysicalExec selectedOuter;
PhysicalExec selectedInner;
// HashJoinExec loads the inner relation to memory.
if (leftSize <= rightSize) {
selectedInner = leftExec;
selectedOuter = rightExec;
} else {
selectedInner = rightExec;
selectedOuter = leftExec;
}
LOG.info("Join (" + node.getPID() +") chooses [InMemory Hash Join]");
return new HashJoinExec(context, node, selectedOuter, selectedInner);
} else {
return createMergeInnerJoin(context, node, leftExec, rightExec);
}
}
private MergeJoinExec createMergeInnerJoin(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
ExternalSortExec outerSort = new ExternalSortExec(context, sm,
new SortNode(UNGENERATED_PID, sortSpecs[0], leftExec.getSchema(), leftExec.getSchema()),
leftExec);
ExternalSortExec innerSort = new ExternalSortExec(context, sm,
new SortNode(UNGENERATED_PID, sortSpecs[1], rightExec.getSchema(), rightExec.getSchema()),
rightExec);
LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]");
return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
}
private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Left Outer Join (" + join.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
case NESTED_LOOP_JOIN:
//the right operand is too large, so we opt for NL implementation of left outer join
LOG.info("Left Outer Join (" + join.getPID() +") chooses [Nested Loop Join].");
return new NLLeftOuterJoinExec(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
}
} else {
return createBestLeftOuterJoinPlan(context, plan, join, leftExec, rightExec);
}
}
private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
String [] rightLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 1));
//=======
// String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long rightTableVolume = estimateSizeRecursive(context, rightLineage);
if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
// we can implement left outer join using hash join, using the right operand as the build relation
LOG.info("Left Outer Join (" + join.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
}
else {
//the right operand is too large, so we opt for NL implementation of left outer join
LOG.info("Left Outer Join (" + join.getPID() +") chooses [Nested Loop Join].");
return new NLLeftOuterJoinExec(context, join, leftExec, rightExec);
}
}
private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
//if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
// blocking, but merge join is blocking as well)
String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getChild(join, 0));
//=======
// String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild());
//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long outerSize = estimateSizeRecursive(context, outerLineage4);
if (outerSize < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
LOG.info("Right Outer Join (" + join.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
} else {
return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
}
private PhysicalExec createRightOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
//the left operand is too large, so opt for merge join implementation
LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Merge Join].");
SortSpec[][] sortSpecs2 = PlannerUtil.getSortKeysFromJoinQual(
plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
ExternalSortExec outerSort2 = new ExternalSortExec(context, sm,
new SortNode(UNGENERATED_PID,sortSpecs2[0], leftExec.getSchema(), leftExec.getSchema()), leftExec);
ExternalSortExec innerSort2 = new ExternalSortExec(context, sm,
new SortNode(UNGENERATED_PID,sortSpecs2[1], rightExec.getSchema(), rightExec.getSchema()), rightExec);
return new RightOuterMergeJoinExec(context, plan, outerSort2, innerSort2, sortSpecs2[0], sortSpecs2[1]);
}
private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Right Outer Join (" + join.getPID() +") chooses [Hash Join].");
return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
case MERGE_JOIN:
return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
} else {
return createBestRightJoinPlan(context, plan, join, leftExec, rightExec);
}
}
private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
return createFullOuterHashJoinPlan(context, plan, join, leftExec, rightExec);
case MERGE_JOIN:
return createFullOuterMergeJoinPlan(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
return createFullOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
} else {
return createBestFullOuterJoinPlan(context, plan, join, leftExec, rightExec);
}
}
private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec)
throws IOException {
String [] leftLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 0));
String [] rightLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 1));
//=======
// String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
// String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long outerSize2 = estimateSizeRecursive(context, leftLineage);
long innerSize2 = estimateSizeRecursive(context, rightLineage);
PhysicalExec selectedRight;
PhysicalExec selectedLeft;
// HashJoinExec loads the smaller relation to memory.
if (outerSize2 <= innerSize2) {
selectedLeft = leftExec;
selectedRight = rightExec;
} else {
selectedLeft = rightExec;
selectedRight = leftExec;
}
LOG.info("Full Outer Join (" + join.getPID() +") chooses [Hash Join]");
return new HashFullOuterJoinExec(context, join, selectedRight, selectedLeft);
}
private MergeFullOuterJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
PhysicalExec leftExec, PhysicalExec rightExec)
throws IOException {
// if size too large, full outer merge join implementation
LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Merge Join]");
SortSpec[][] sortSpecs3 = PlannerUtil.getSortKeysFromJoinQual(plan.getJoinQual(),
leftExec.getSchema(), rightExec.getSchema());
ExternalSortExec outerSort3 = new ExternalSortExec(context, sm,
new SortNode(UNGENERATED_PID,sortSpecs3[0], leftExec.getSchema(), leftExec.getSchema()), leftExec);
ExternalSortExec innerSort3 = new ExternalSortExec(context, sm,
new SortNode(UNGENERATED_PID,sortSpecs3[1], rightExec.getSchema(), rightExec.getSchema()), rightExec);
return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
}
private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
String [] leftLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 0));
String [] rightLineage = PlannerUtil.getRelationLineage(plan.getChild(join, 1));
//=======
// String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
// String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long outerSize2 = estimateSizeRecursive(context, leftLineage);
long innerSize2 = estimateSizeRecursive(context, rightLineage);
final long threshold = 1048576 * 128;
if (outerSize2 < threshold || innerSize2 < threshold) {
return createFullOuterHashJoinPlan(context, plan, join, leftExec, rightExec);
} else {
return createFullOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
}
/**
* Left semi join means that the left side is the IN side table, and the right side is the FROM side table.
*/
private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftSemiJoinExec(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
}
} else {
LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftSemiJoinExec(context, join, leftExec, rightExec);
}
}
/**
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
*/
private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
default:
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
}
} else {
LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
}
}
/**
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
*/
private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftAntiJoinExec(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
return new HashLeftAntiJoinExec(context, join, leftExec, rightExec);
}
} else {
LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftAntiJoinExec(context, join, leftExec, rightExec);
}
}
/**
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
*/
private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
default:
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
}
} else {
LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
}
}
public PhysicalExec createStorePlan(TaskAttemptContext ctx,
StoreTableNode plan, PhysicalExec subOp) throws IOException {
if (plan.getPartitionType() == PartitionType.HASH_PARTITION
|| plan.getPartitionType() == PartitionType.RANGE_PARTITION) {
DataChannel channel = null;
for (DataChannel outChannel : ctx.getOutgoingChannels()) {
if (outChannel.getSrcPID() == plan.getPID()) {
channel = outChannel;
break;
}
}
switch (channel.getPartitionType()) {
case HASH_PARTITION:
return new PartitionedStoreExec(ctx, sm, plan, subOp);
case RANGE_PARTITION:
SortExec sortExec = PhysicalPlanUtil.findExecutor(subOp, SortExec.class);
SortSpec [] sortSpecs = null;
if (sortExec != null) {
sortSpecs = sortExec.getSortSpecs();
} else {
Column[] columns = channel.getPartitionKey();
sortSpecs= new SortSpec[columns.length];
for (int i = 0; i < columns.length; i++) {
sortSpecs[i] = new SortSpec(columns[i]);
}
}
return new IndexedStoreExec(ctx, sm, subOp,
plan.getInSchema(), plan.getInSchema(), sortSpecs);
}
}
if (plan instanceof StoreIndexNode) {
return new TunnelExec(ctx, plan.getOutSchema(), subOp);
}
// Find partitioned table
if (plan.getPartitions() != null) {
if (plan.getPartitions().getPartitionsType().equals(CatalogProtos.PartitionsType.COLUMN)) {
return new ColumnPartitionedTableStoreExec(ctx, plan, subOp);
} else if (plan.getPartitions().getPartitionsType().equals(CatalogProtos.PartitionsType.HASH)) {
// TODO
} else if (plan.getPartitions().getPartitionsType().equals(CatalogProtos.PartitionsType.RANGE)) {
// TODO
} else if (plan.getPartitions().getPartitionsType().equals(CatalogProtos.PartitionsType.LIST)) {
// TODO
}
}
return new StoreTableExec(ctx, plan, subOp);
}
public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode) throws IOException {
Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
"Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
FragmentProto[] fragments = ctx.getTables(scanNode.getCanonicalName());
return new SeqScanExec(ctx, sm, scanNode, fragments);
}
public PhysicalExec createGroupByPlan(TaskAttemptContext context, ExecutionPlan plan, GroupbyNode groupbyNode,
PhysicalExec subOp)
throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, groupbyNode);
if (property != null) {
GroupbyAlgorithm algorithm = property.getGroupby().getAlgorithm();
if (algorithm == GroupbyAlgorithm.HASH_AGGREGATION) {
return createInMemoryHashAggregation(context, groupbyNode, subOp);
} else {
return createSortAggregation(context, property, groupbyNode, subOp);
}
}
return createBestAggregationPlan(context, plan, groupbyNode, subOp);
}
private PhysicalExec createInMemoryHashAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp)
throws IOException {
LOG.info("The planner chooses [Hash Aggregation]");
return new HashAggregateExec(ctx, groupbyNode, subOp);
}
private PhysicalExec createSortAggregation(TaskAttemptContext ctx, EnforceProperty property, GroupbyNode groupbyNode, PhysicalExec subOp)
throws IOException {
Column[] grpColumns = groupbyNode.getGroupingColumns();
SortSpec[] sortSpecs = new SortSpec[grpColumns.length];
for (int i = 0; i < grpColumns.length; i++) {
sortSpecs[i] = new SortSpec(grpColumns[i], true, false);
}
if (property != null) {
List<SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList();
SortSpec[] enforcedSortSpecs = new SortSpec[sortSpecProtos.size()];
int i = 0;
for (int j = 0; j < sortSpecProtos.size(); i++, j++) {
enforcedSortSpecs[i] = new SortSpec(sortSpecProtos.get(j));
}
sortSpecs = ObjectArrays.concat(sortSpecs, enforcedSortSpecs, SortSpec.class);
}
SortNode sortNode = new SortNode(-1, sortSpecs);
sortNode.setInSchema(subOp.getSchema());
sortNode.setOutSchema(subOp.getSchema());
ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp);
LOG.info("The planner chooses [Sort Aggregation] in (" + TUtil.arrayToString(sortSpecs) + ")");
return new SortAggregateExec(ctx, groupbyNode, sortExec);
}
private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, ExecutionPlan plan, GroupbyNode groupbyNode,
PhysicalExec subOp) throws IOException {
Column[] grpColumns = groupbyNode.getGroupingColumns();
if (grpColumns.length == 0) {
return createInMemoryHashAggregation(context, groupbyNode, subOp);
}
String [] outerLineage = PlannerUtil.getRelationLineage(plan, plan.getChilds(groupbyNode).get(0));
//=======
// String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long estimatedSize = estimateSizeRecursive(context, outerLineage);
final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
// if the relation size is less than the threshold,
// the hash aggregation will be used.
if (estimatedSize <= threshold) {
LOG.info("The planner chooses [Hash Aggregation]");
return createInMemoryHashAggregation(context, groupbyNode, subOp);
} else {
return createSortAggregation(context, null, groupbyNode, subOp);
}
}
public PhysicalExec createSortPlan(TaskAttemptContext context, ExecutionPlan plan, SortNode sortNode,
PhysicalExec child) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, sortNode);
if (property != null) {
SortEnforce.SortAlgorithm algorithm = property.getSort().getAlgorithm();
if (algorithm == SortEnforce.SortAlgorithm.IN_MEMORY_SORT) {
return new MemSortExec(context, sortNode, child);
} else {
return new ExternalSortExec(context, sm, sortNode, child);
}
}
return createBestSortPlan(context, plan, sortNode, child);
}
public SortExec createBestSortPlan(TaskAttemptContext context, ExecutionPlan plan, SortNode sortNode,
PhysicalExec child) throws IOException {
String [] outerLineage = PlannerUtil.getRelationLineage(plan, plan.getChilds(sortNode).get(0));
//=======
// String [] outerLineage = PlannerUtil.getRelationLineage(sortNode.getChild());
//>>>>>>> 3a5a617c6bb1dd10d026ab0735f9031623a66d30
long estimatedSize = estimateSizeRecursive(context, outerLineage);
final long threshold = 1048576 * 2000;
// if the relation size is less than thereshold,
// the in-memory sort will be used.
if (estimatedSize <= threshold) {
return new MemSortExec(context, sortNode, child);
} else {
return new ExternalSortExec(context, sm, sortNode, child);
}
}
public PhysicalExec createIndexScanExec(TaskAttemptContext ctx,
IndexScanNode annotation)
throws IOException {
//TODO-general Type Index
Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()),
"Error: There is no table matched to %s", annotation.getCanonicalName());
FragmentProto[] fragmentProtos = ctx.getTables(annotation.getTableName());
List<FileFragment> fragments =
FragmentConvertor.convert(ctx.getConf(), ctx.getIncomingChannels().get(0).getStoreType(), fragmentProtos);
String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0),
annotation.getSortKeys());
Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
annotation.getSortKeys());
return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new Path(
indexPath, indexName), annotation.getKeySchema(), comp,
annotation.getDatum());
}
private EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {
if (enforcer == null) {
return null;
}
EnforceType type;
if (node.getType() == NodeType.JOIN) {
type = EnforceType.JOIN;
} else if (node.getType() == NodeType.GROUP_BY) {
type = EnforceType.GROUP_BY;
} else if (node.getType() == NodeType.SORT) {
type = EnforceType.SORT;
} else {
return null;
}
if (enforcer.hasEnforceProperty(type)) {
List<EnforceProperty> properties = enforcer.getEnforceProperties(type);
EnforceProperty found = null;
for (EnforceProperty property : properties) {
if (type == EnforceType.JOIN && property.getJoin().getPid() == node.getPID()) {
found = property;
} else if (type == EnforceType.GROUP_BY && property.getGroupby().getPid() == node.getPID()) {
found = property;
} else if (type == EnforceType.SORT && property.getSort().getPid() == node.getPID()) {
found = property;
}
}
return found;
} else {
return null;
}
}
}