blob: b9206a13c1d3352d71775db84a7e88727c57637f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner.global;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.global.ExecutionPlan.PlanGroup;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.storage.AbstractStorageManager;
import java.io.IOException;
import java.util.*;
import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
/**
* Build DAG
*/
public class GlobalPlanner {
private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
private TajoConf conf;
private CatalogProtos.StoreType storeType;
public GlobalPlanner(final TajoConf conf, final AbstractStorageManager sm)
throws IOException {
this.conf = conf;
this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase());
Preconditions.checkArgument(storeType != null);
}
public class GlobalPlanContext {
MasterPlan plan;
Map<Integer, ExecutionBlock> execBlockMap = Maps.newHashMap();
}
/**
* Builds a master plan from the given logical plan.
*/
public void build(MasterPlan masterPlan) throws IOException, PlanningException {
DistributedPlannerVisitor planner = new DistributedPlannerVisitor();
GlobalPlanContext globalPlanContext = new GlobalPlanContext();
globalPlanContext.plan = masterPlan;
LOG.info(masterPlan.getLogicalPlan());
LogicalNode inputPlan = PlannerUtil.clone(masterPlan.getLogicalPlan().getPidFactory(),
masterPlan.getLogicalPlan().getRootBlock().getRoot());
LogicalNode lastNode = planner.visitChild(globalPlanContext, masterPlan.getLogicalPlan(), inputPlan,
new Stack<LogicalNode>());
ExecutionBlock childExecBlock = globalPlanContext.execBlockMap.get(lastNode.getPID());
if (childExecBlock.getPlan().hasPlanGroup()) {
ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
DataChannel dataChannel = new DataChannel(childExecBlock, terminalBlock, NONE_PARTITION, 1);
dataChannel.setStoreType(CatalogProtos.StoreType.CSV);
dataChannel.setSchema(lastNode.getOutSchema());
if (childExecBlock.getPlan().hasPlanGroup()) {
DataChannel.connectPlanGroups(childExecBlock.getPlan().getFirstPlanGroup(),
null, dataChannel);
}
masterPlan.addConnect(dataChannel);
masterPlan.setTerminal(terminalBlock);
} else {
masterPlan.setTerminal(childExecBlock);
}
LOG.info(masterPlan);
}
public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
Preconditions.checkArgument(channel.getSchema() != null,
"Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
TableMeta meta = new TableMeta(channel.getStoreType(), new Options());
TableDesc desc = new TableDesc(channel.getSrcId().toString(), channel.getSchema(), meta, new Path("/"));
return new ScanNode(plan.newPID(), desc);
}
private DataChannel createDataChannelFromJoin(ExecutionBlock leftBlock, ExecutionBlock rightBlock,
ExecutionBlock parent, JoinNode join, boolean leftTable) {
ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
channel.setSchema(childBlock.getPlan().getFirstPlanGroup().getRootNode().getOutSchema());
channel.setStoreType(storeType);
if (join.getJoinType() != JoinType.CROSS) {
// Each block should have the only one output schema
Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
leftBlock.getPlan().getFirstPlanGroup().getRootNode().getOutSchema(),
rightBlock.getPlan().getFirstPlanGroup().getRootNode().getOutSchema());
if (leftTable) {
channel.setPartitionKey(joinColumns[0]);
} else {
channel.setPartitionKey(joinColumns[1]);
}
}
return channel;
}
private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
ExecutionBlock leftBlock, ExecutionBlock rightBlock)
throws PlanningException {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
LogicalNode leftNode = joinNode.getLeftChild();
LogicalNode rightNode = joinNode.getRightChild();
boolean leftBroadcasted = false;
boolean rightBroadcasted = false;
if (leftNode.getType() == NodeType.SCAN && rightNode.getType() == NodeType.SCAN ) {
ScanNode leftScan = (ScanNode) leftNode;
ScanNode rightScan = (ScanNode) rightNode;
TableDesc leftDesc = leftScan.getTableDesc();
TableDesc rightDesc = rightScan.getTableDesc();
long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD);
if (leftDesc.getStats().getNumBytes() < broadcastThreshold) {
leftBroadcasted = true;
}
if (rightDesc.getStats().getNumBytes() < broadcastThreshold) {
rightBroadcasted = true;
}
if (leftBroadcasted || rightBroadcasted) {
currentBlock = masterPlan.newExecutionBlock();
currentBlock.setPlan(joinNode);
if (leftBroadcasted) {
currentBlock.addBroadcastTable(leftScan.getCanonicalName());
}
if (rightBroadcasted) {
currentBlock.addBroadcastTable(rightScan.getCanonicalName());
}
context.execBlockMap.remove(leftScan.getPID());
context.execBlockMap.remove(rightScan.getPID());
return currentBlock;
}
}
// symmetric repartition join
currentBlock = masterPlan.newExecutionBlock();
DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
joinNode.setLeftChild(leftScan);
joinNode.setRightChild(rightScan);
currentBlock.setPlan(joinNode);
DataChannel.connectPlanGroups(leftBlock.getPlan().getFirstPlanGroup(),
currentBlock.getPlan().getFirstPlanGroup(), leftChannel);
DataChannel.connectPlanGroups(rightBlock.getPlan().getFirstPlanGroup(),
currentBlock.getPlan().getFirstPlanGroup(), rightChannel);
masterPlan.addConnect(leftChannel);
masterPlan.addConnect(rightChannel);
return currentBlock;
}
/**
* If a query contains a distinct aggregation function, the query does not
* perform pre-aggregation in the first phase. Instead, in the fist phase,
* the query performs only hash shuffle. Then, the query performs the
* sort aggregation in the second phase. At that time, the aggregation
* function should be executed as the first phase.
*/
private ExecutionBlock buildDistinctGroupBy(GlobalPlanContext context, ExecutionBlock childBlock,
GroupbyNode groupbyNode) {
// setup child block
LogicalNode topMostOfFirstPhase = groupbyNode.getChild();
childBlock.setPlan(topMostOfFirstPhase);
// setup current block
ExecutionBlock currentBlock = context.plan.newExecutionBlock();
LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
for (Target target : groupbyNode.getTargets()) {
List<AggregationFunctionCallEval> functions = EvalTreeUtil.findDistinctAggFunction(target.getEvalTree());
for (AggregationFunctionCallEval function : functions) {
if (function.isDistinct()) {
columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
} else {
// See the comment of this method. the aggregation function should be executed as the first phase.
function.setFirstPhase();
}
}
}
// Set sort aggregation enforcer to the second groupby node
Set<Column> existingColumns = Sets.newHashSet(groupbyNode.getGroupingColumns());
columnsForDistinct.removeAll(existingColumns); // remove existing grouping columns
SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(columnsForDistinct);
currentBlock.getEnforcer().enforceSortAggregation(groupbyNode.getPID(), sortSpecs);
// setup channel
DataChannel channel;
channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
channel.setPartitionKey(groupbyNode.getGroupingColumns());
channel.setSchema(topMostOfFirstPhase.getOutSchema());
channel.setStoreType(storeType);
// setup current block with channel
ScanNode scanNode = buildInputExecutor(context.plan.getLogicalPlan(), channel);
groupbyNode.setChild(scanNode);
currentBlock.setPlan(groupbyNode);
DataChannel.connectPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
currentBlock.getPlan().getFirstPlanGroup(), channel);
context.plan.addConnect(channel);
return currentBlock;
}
private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock childBlock,
GroupbyNode groupbyNode)
throws PlanningException {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
if (groupbyNode.isDistinct()) {
return buildDistinctGroupBy(context, childBlock, groupbyNode);
} else {
GroupbyNode firstPhaseGroupBy = PlannerUtil.transformGroupbyTo2P(groupbyNode);
firstPhaseGroupBy.setHavingCondition(null);
if (firstPhaseGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
((TableSubQueryNode)firstPhaseGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
currentBlock = childBlock;
for (DataChannel dataChannel : masterPlan.getIncomingChannels(currentBlock.getId())) {
if (firstPhaseGroupBy.isEmptyGrouping()) {
dataChannel.setPartition(HASH_PARTITION, firstPhaseGroupBy.getGroupingColumns(), 1);
} else {
dataChannel.setPartition(HASH_PARTITION, firstPhaseGroupBy.getGroupingColumns(), 32);
}
dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
GroupbyNode g1 = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), firstPhaseGroupBy);
LogicalNode topNodeOfSubBlock = subBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode();
g1.setChild(topNodeOfSubBlock);
subBlock.setPlan(g1);
GroupbyNode g2 = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), groupbyNode);
ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
g2.setChild(scanNode);
currentBlock.setPlan(g2);
DataChannel.connectPlanGroups(subBlock.getPlan().getFirstPlanGroup(),
currentBlock.getPlan().getFirstPlanGroup(), dataChannel);
}
} else { // general hash-shuffled aggregation
childBlock.setPlan(firstPhaseGroupBy);
currentBlock = masterPlan.newExecutionBlock();
DataChannel channel;
if (firstPhaseGroupBy.isEmptyGrouping()) {
channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
} else {
channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
}
channel.setSchema(firstPhaseGroupBy.getOutSchema());
channel.setStoreType(storeType);
ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
groupbyNode.setChild(scanNode);
groupbyNode.setInSchema(scanNode.getOutSchema());
currentBlock.setPlan(groupbyNode);
DataChannel.connectPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
currentBlock.getPlan().getFirstPlanGroup(), channel);
masterPlan.addConnect(channel);
}
}
return currentBlock;
}
private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
SortNode firstSortNode = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), currentNode);
LogicalNode childBlockPlan = childBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode();
firstSortNode.setChild(childBlockPlan);
// sort is a non-projectable operator. So, in/out schemas are the same to its child operator.
firstSortNode.setInSchema(childBlockPlan.getOutSchema());
firstSortNode.setOutSchema(childBlockPlan.getOutSchema());
childBlock.setPlan(firstSortNode);
currentBlock = masterPlan.newExecutionBlock();
DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
channel.setSchema(firstSortNode.getOutSchema());
channel.setStoreType(storeType);
ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
currentNode.setChild(secondScan);
currentNode.setInSchema(secondScan.getOutSchema());
currentBlock.setPlan(currentNode);
DataChannel.connectPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
currentBlock.getPlan().getFirstPlanGroup(), channel);
masterPlan.addConnect(channel);
return currentBlock;
}
private ExecutionBlock buildStorePlan(GlobalPlanContext context,
ExecutionBlock childBlock,
StoreTableNode currentNode)
throws PlanningException
{
PartitionDesc partitionDesc = currentNode.getPartitions();
// if result table is not a partitioned table, directly store it
if(partitionDesc == null) {
currentNode.setChild(childBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
currentNode.setInSchema(childBlock.getPlan().getFirstPlanGroup().getRootNode().getOutSchema());
childBlock.setPlan(currentNode);
return childBlock;
}
// if result table is a partitioned table
// 1. replace StoreTableNode with its child node,
// old execution block ends at the child node
LogicalNode childNode = currentNode.getChild();
childBlock.setPlan(childNode);
// 2. create a new execution block, pipeline 2 exec blocks through a DataChannel
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
DataChannel channel = null;
CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
Column[] columns = new Column[partitionDesc.getColumns().size()];
channel.setPartitionKey(partitionDesc.getColumns().toArray(columns));
channel.setSchema(childNode.getOutSchema());
channel.setStoreType(storeType);
} else {
throw new PlanningException(String.format("Not Supported PartitionsType :%s", partitionsType));
}
// 3. create a ScanNode for scanning shuffle data
// StoreTableNode as the root node of the new execution block
ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
currentNode.setChild(scanNode);
currentNode.setInSchema(scanNode.getOutSchema());
currentBlock.setPlan(currentNode);
DataChannel.connectPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
currentBlock.getPlan().getFirstPlanGroup(), channel);
masterPlan.addConnect(channel);
return currentBlock;
}
public class DistributedPlannerVisitor extends BasicLogicalPlanVisitor<GlobalPlanContext, LogicalNode> {
@Override
public LogicalNode visitRoot(GlobalPlanContext context, LogicalPlan plan, LogicalRootNode node,
Stack<LogicalNode> stack) throws PlanningException {
LogicalNode child = super.visitRoot(context, plan, node, stack);
return child;
}
@Override
public LogicalNode visitProjection(GlobalPlanContext context, LogicalPlan plan, ProjectionNode node,
Stack<LogicalNode> stack) throws PlanningException {
LogicalNode child = super.visitProjection(context, plan, node, stack);
ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
// execBlock.getPlan().build();
node.setChild(execBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
node.setInSchema(execBlock.getPlan().getOutSchema(0));
execBlock.setPlan(node);
context.execBlockMap.put(node.getPID(), execBlock);
return node;
}
@Override
public LogicalNode visitLimit(GlobalPlanContext context, LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack)
throws PlanningException {
LogicalNode child = super.visitLimit(context, plan, node, stack);
ExecutionBlock block;
block = context.execBlockMap.remove(child.getPID());
if (child.getType() == NodeType.SORT) {
node.setChild(block.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
block.setPlan(node);
ExecutionBlock childBlock = context.plan.getChild(block, 0);
LimitNode childLimit = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), node);
childLimit.setChild(childBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
childBlock.setPlan(childLimit);
DataChannel channel = context.plan.getChannel(childBlock, block);
channel.setPartitionNum(1);
DataChannel.connectPlanGroups(childBlock.getPlan().getFirstPlanGroup(),
block.getPlan().getFirstPlanGroup(), channel);
context.execBlockMap.put(node.getPID(), block);
} else {
node.setChild(block.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
block.setPlan(node);
ExecutionBlock newExecBlock = context.plan.newExecutionBlock();
DataChannel newChannel = new DataChannel(block, newExecBlock, HASH_PARTITION, 1);
newChannel.setPartitionKey(new Column[]{});
newChannel.setSchema(node.getOutSchema());
newChannel.setStoreType(storeType);
ScanNode scanNode = buildInputExecutor(plan, newChannel);
LimitNode parentLimit = PlannerUtil.clone(context.plan.getLogicalPlan().getPidFactory(), node);
parentLimit.setChild(scanNode);
newExecBlock.setPlan(parentLimit);
DataChannel.connectPlanGroups(block.getPlan().getFirstPlanGroup(),
newExecBlock.getPlan().getFirstPlanGroup(), newChannel);
context.plan.addConnect(newChannel);
context.execBlockMap.put(parentLimit.getPID(), newExecBlock);
node = parentLimit;
}
return node;
}
@Override
public LogicalNode visitSort(GlobalPlanContext context, LogicalPlan plan, SortNode node, Stack<LogicalNode> stack)
throws PlanningException {
LogicalNode child = super.visitSort(context, plan, node, stack);
ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
ExecutionBlock newExecBlock = buildSortPlan(context, childBlock, node);
context.execBlockMap.put(node.getPID(), newExecBlock);
return node;
}
@Override
public LogicalNode visitGroupBy(GlobalPlanContext context, LogicalPlan plan, GroupbyNode node,
Stack<LogicalNode> stack) throws PlanningException {
LogicalNode child = super.visitGroupBy(context, plan, node, stack);
ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
ExecutionBlock newExecBlock = buildGroupBy(context, childBlock, node);
context.execBlockMap.put(node.getPID(), newExecBlock);
return node;
}
@Override
public LogicalNode visitFilter(GlobalPlanContext context, LogicalPlan plan, SelectionNode node,
Stack<LogicalNode> stack) throws PlanningException {
LogicalNode child = super.visitFilter(context, plan, node, stack);
ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
// execBlock.getPlan().build();
node.setChild(execBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
node.setInSchema(execBlock.getPlan().getOutSchema(0));
execBlock.setPlan(node);
context.execBlockMap.put(node.getPID(), execBlock);
return node;
}
@Override
public LogicalNode visitJoin(GlobalPlanContext context, LogicalPlan plan, JoinNode node, Stack<LogicalNode> stack)
throws PlanningException {
LogicalNode leftChild = visitChild(context, plan, node.getLeftChild(), stack);
LogicalNode rightChild = visitChild(context, plan, node.getRightChild(), stack);
ExecutionBlock leftChildBlock = context.execBlockMap.get(leftChild.getPID());
ExecutionBlock rightChildBlock = context.execBlockMap.get(rightChild.getPID());
ExecutionBlock newExecBlock = buildJoinPlan(context, node, leftChildBlock, rightChildBlock);
context.execBlockMap.put(node.getPID(), newExecBlock);
return node;
}
@Override
public LogicalNode visitUnion(GlobalPlanContext context, LogicalPlan plan, UnionNode node,
Stack<LogicalNode> stack) throws PlanningException {
stack.push(node);
LogicalNode leftChild = visitChild(context, plan, node.getLeftChild(), stack);
LogicalNode rightChild = visitChild(context, plan, node.getRightChild(), stack);
stack.pop();
List<ExecutionBlock> unionBlocks = Lists.newArrayList();
List<ExecutionBlock> queryBlockBlocks = Lists.newArrayList();
ExecutionBlock leftBlock = context.execBlockMap.remove(leftChild.getPID());
ExecutionBlock rightBlock = context.execBlockMap.remove(rightChild.getPID());
if (leftChild.getType() == NodeType.UNION) {
unionBlocks.add(leftBlock);
} else {
queryBlockBlocks.add(leftBlock);
}
if (rightChild.getType() == NodeType.UNION) {
unionBlocks.add(rightBlock);
} else {
queryBlockBlocks.add(rightBlock);
}
ExecutionBlock execBlock;
PlanGroup parentPlanGroup = null;
if (unionBlocks.size() == 0) {
execBlock = context.plan.newExecutionBlock();
} else {
execBlock = unionBlocks.get(0);
parentPlanGroup = execBlock.getPlan().getFirstPlanGroup();
}
for (ExecutionBlock childBlocks : unionBlocks) {
UnionNode union = (UnionNode) childBlocks.getPlan().getFirstPlanGroup().toLinkedLogicalNode();
queryBlockBlocks.add(context.execBlockMap.get(union.getLeftChild().getPID()));
queryBlockBlocks.add(context.execBlockMap.get(union.getRightChild().getPID()));
}
for (ExecutionBlock childBlocks : queryBlockBlocks) {
DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_PARTITION, 1);
channel.setSchema(childBlocks.getPlan().getFirstPlanGroup().getRootNode().getOutSchema());
channel.setStoreType(storeType);
if (childBlocks.getPlan().hasPlanGroup()) {
DataChannel.connectPlanGroups(childBlocks.getPlan().getFirstPlanGroup(),
parentPlanGroup, channel);
}
context.plan.addConnect(channel);
}
context.execBlockMap.put(node.getPID(), execBlock);
return node;
}
private LogicalNode handleUnaryNode(GlobalPlanContext context, LogicalNode child, LogicalNode node) {
ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
// if (node instanceof UnaryNode) {
// ((UnaryNode)node).setChild(execBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
// } else if (node instanceof TableSubQueryNode) {
// ((TableSubQueryNode) node).setSubQuery(execBlock.getPlan().getFirstPlanGroup().toLinkedLogicalNode());
// }
execBlock.setPlan(node);
context.execBlockMap.put(node.getPID(), execBlock);
return node;
}
@Override
public LogicalNode visitExcept(GlobalPlanContext context, LogicalPlan plan, ExceptNode node,
Stack<LogicalNode> stack) throws PlanningException {
LogicalNode child = super.visitExcept(context, plan, node, stack);
return handleUnaryNode(context, child, node);
}
@Override
public LogicalNode visitIntersect(GlobalPlanContext context, LogicalPlan plan, IntersectNode node,
Stack<LogicalNode> stack) throws PlanningException {
LogicalNode child = super.visitIntersect(context, plan, node, stack);
return handleUnaryNode(context, child, node);
}
@Override
public LogicalNode visitTableSubQuery(GlobalPlanContext context, LogicalPlan plan, TableSubQueryNode node,
Stack<LogicalNode> stack) throws PlanningException {
LogicalNode child = super.visitTableSubQuery(context, plan, node, stack);
return handleUnaryNode(context, child, node);
}
@Override
public LogicalNode visitScan(GlobalPlanContext context, LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack)
throws PlanningException {
ExecutionBlock newExecBlock = context.plan.newExecutionBlock();
newExecBlock.setPlan(node);
context.execBlockMap.put(node.getPID(), newExecBlock);
return node;
}
@Override
public LogicalNode visitStoreTable(GlobalPlanContext context, LogicalPlan plan, StoreTableNode node,
Stack<LogicalNode> stack) throws PlanningException {
LogicalNode child = super.visitStoreTable(context, plan, node, stack);
ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
ExecutionBlock newExecBlock = buildStorePlan(context, childBlock, node);
context.execBlockMap.put(node.getPID(), newExecBlock);
return node;
}
@Override
public LogicalNode visitInsert(GlobalPlanContext context, LogicalPlan plan, InsertNode node,
Stack<LogicalNode> stack)
throws PlanningException {
LogicalNode child = super.visitInsert(context, plan, node, stack);
ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
execBlock.setPlan(node);
context.execBlockMap.put(node.getPID(), execBlock);
return node;
}
}
private class UnionsFinderContext {
List<UnionNode> unionList = new ArrayList<UnionNode>();
}
@SuppressWarnings("unused")
private class ConsecutiveUnionFinder extends BasicLogicalPlanVisitor<UnionsFinderContext, LogicalNode> {
@Override
public LogicalNode visitUnion(UnionsFinderContext context, LogicalPlan plan, UnionNode node,
Stack<LogicalNode> stack)
throws PlanningException {
if (node.getType() == NodeType.UNION) {
context.unionList.add(node);
}
stack.push(node);
TableSubQueryNode leftSubQuery = node.getLeftChild();
TableSubQueryNode rightSubQuery = node.getRightChild();
if (leftSubQuery.getSubQuery().getType() == NodeType.UNION) {
visitChild(context, plan, leftSubQuery, stack);
}
if (rightSubQuery.getSubQuery().getType() == NodeType.UNION) {
visitChild(context, plan, rightSubQuery, stack);
}
stack.pop();
return node;
}
}
}