blob: 8a7229b3e238efd7e3371e75d0a6946724173994 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
*
*/
package org.apache.tajo.engine.planner.global;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.util.graph.DirectedGraphVisitor;
import org.apache.tajo.util.graph.SimpleDirectedGraph;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
public class MasterPlan {
private final QueryId queryId;
private final QueryContext context;
private final LogicalPlan plan;
private ExecutionBlock root;
private AtomicInteger nextId = new AtomicInteger(0);
private ExecutionBlock terminalBlock;
private Map<ExecutionBlockId, ExecutionBlock> execBlockMap = new HashMap<ExecutionBlockId, ExecutionBlock>();
private SimpleDirectedGraph<ExecutionBlockId, DataChannel> execBlockGraph =
new SimpleDirectedGraph<ExecutionBlockId, DataChannel>();
public ExecutionBlockId newExecutionBlockId() {
return new ExecutionBlockId(queryId, nextId.incrementAndGet());
}
public boolean isTerminal(ExecutionBlock execBlock) {
return terminalBlock.getId().equals(execBlock.getId());
}
public ExecutionBlock getTerminalBlock() {
return terminalBlock;
}
public ExecutionBlock createTerminalBlock() {
terminalBlock = newExecutionBlock();
return terminalBlock;
}
public MasterPlan(QueryId queryId, QueryContext context, LogicalPlan plan) {
this.queryId = queryId;
this.context = context;
this.plan = plan;
}
public QueryId getQueryId() {
return this.queryId;
}
public QueryContext getContext() {
return this.context;
}
public LogicalPlan getLogicalPlan() {
return this.plan;
}
public void setTerminal(ExecutionBlock root) {
this.root = root;
this.terminalBlock = root;
}
public ExecutionBlock getRoot() {
return this.root;
}
public ExecutionBlock newExecutionBlock() {
ExecutionBlock newExecBlock = new ExecutionBlock(newExecutionBlockId());
execBlockMap.put(newExecBlock.getId(), newExecBlock);
return newExecBlock;
}
public boolean containsExecBlock(ExecutionBlockId execBlockId) {
return execBlockMap.containsKey(execBlockId);
}
public ExecutionBlock getExecBlock(ExecutionBlockId execBlockId) {
return execBlockMap.get(execBlockId);
}
public void removeExecBlock(ExecutionBlockId execBlockId) throws IllegalStateException {
List<DataChannel> channels = getIncomingChannels(execBlockId);
if (channels != null && channels.size() > 0) {
throw new IllegalStateException("Cannot remove execution blocks because some other execution blocks are connected");
}
channels = getOutgoingChannels(execBlockId);
if (channels != null && channels.size() > 0) {
throw new IllegalStateException("Cannot remove execution blocks because some other execution blocks are connected");
}
execBlockMap.remove(execBlockId);
}
public void addConnect(DataChannel dataChannel) {
execBlockGraph.addEdge(dataChannel.getSrcId(), dataChannel.getTargetId(), dataChannel);
}
public void addConnect(ExecutionBlock src, ExecutionBlock target, ShuffleType type) {
addConnect(src.getId(), target.getId(), type);
}
public void addConnect(ExecutionBlockId src, ExecutionBlockId target, ShuffleType type) {
addConnect(new DataChannel(src, target, type));
}
public boolean isConnected(ExecutionBlock src, ExecutionBlock target) {
return isConnected(src.getId(), target.getId());
}
public boolean isConnected(ExecutionBlockId src, ExecutionBlockId target) {
return execBlockGraph.hasEdge(src, target);
}
public boolean isReverseConnected(ExecutionBlock target, ExecutionBlock src) {
return execBlockGraph.hasReversedEdge(target.getId(), src.getId());
}
public boolean isReverseConnected(ExecutionBlockId target, ExecutionBlockId src) {
return execBlockGraph.hasReversedEdge(target, src);
}
public DataChannel getChannel(ExecutionBlock src, ExecutionBlock target) {
return execBlockGraph.getEdge(src.getId(), target.getId());
}
public DataChannel getChannel(ExecutionBlockId src, ExecutionBlockId target) {
return execBlockGraph.getEdge(src, target);
}
public List<DataChannel> getOutgoingChannels(ExecutionBlockId src) {
return execBlockGraph.getOutgoingEdges(src);
}
public boolean isRoot(ExecutionBlock execBlock) {
if (!execBlock.getId().equals(terminalBlock.getId())) {
return execBlockGraph.getParent(execBlock.getId(), 0).equals(terminalBlock.getId());
} else {
return false;
}
}
public boolean isLeaf(ExecutionBlock execBlock) {
return execBlockGraph.isLeaf(execBlock.getId());
}
public boolean isLeaf(ExecutionBlockId id) {
return execBlockGraph.isLeaf(id);
}
public List<DataChannel> getIncomingChannels(ExecutionBlockId target) {
return execBlockGraph.getIncomingEdges(target);
}
public void disconnect(ExecutionBlock src, ExecutionBlock target) {
disconnect(src.getId(), target.getId());
}
public void disconnect(ExecutionBlockId src, ExecutionBlockId target) {
execBlockGraph.removeEdge(src, target);
}
public ExecutionBlock getParent(ExecutionBlock executionBlock) {
return execBlockMap.get(execBlockGraph.getParent(executionBlock.getId(), 0));
}
public List<ExecutionBlock> getChilds(ExecutionBlock execBlock) {
return getChilds(execBlock.getId());
}
public List<ExecutionBlock> getChilds(ExecutionBlockId id) {
List<ExecutionBlock> childBlocks = new ArrayList<ExecutionBlock>();
for (ExecutionBlockId cid : execBlockGraph.getChilds(id)) {
childBlocks.add(execBlockMap.get(cid));
}
return childBlocks;
}
public int getChildCount(ExecutionBlockId blockId) {
return execBlockGraph.getChildCount(blockId);
}
public ExecutionBlock getChild(ExecutionBlockId execBlockId, int idx) {
return execBlockMap.get(execBlockGraph.getChild(execBlockId, idx));
}
public ExecutionBlock getChild(ExecutionBlock executionBlock, int idx) {
return getChild(executionBlock.getId(), idx);
}
public void accept(ExecutionBlockId v, DirectedGraphVisitor<ExecutionBlockId> visitor) {
execBlockGraph.accept(v, visitor);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
ExecutionBlockCursor cursor = new ExecutionBlockCursor(this);
sb.append("-------------------------------------------------------------------------------\n");
sb.append("Execution Block Graph (TERMINAL - " + getTerminalBlock() + ")\n");
sb.append("-------------------------------------------------------------------------------\n");
sb.append(execBlockGraph.toStringGraph(getRoot().getId()));
sb.append("-------------------------------------------------------------------------------\n");
ExecutionBlockCursor executionOrderCursor = new ExecutionBlockCursor(this, true);
sb.append("Order of Execution\n");
sb.append("-------------------------------------------------------------------------------");
int order = 1;
for (ExecutionBlock currentEB : executionOrderCursor) {
sb.append("\n").append(order).append(": ").append(currentEB.getId());
order++;
}
sb.append("\n-------------------------------------------------------------------------------\n");
for (ExecutionBlock block : executionOrderCursor) {
boolean terminal = false;
sb.append("\n");
sb.append("=======================================================\n");
sb.append("Block Id: " + block.getId());
if (isTerminal(block)) {
sb.append(" [TERMINAL]");
terminal = true;
} else if (isRoot(block)) {
sb.append(" [ROOT]");
} else if (isLeaf(block)) {
sb.append(" [LEAF]");
} else {
sb.append(" [INTERMEDIATE]");
}
sb.append("\n");
sb.append("=======================================================\n");
if (terminal) {
continue;
}
if (!isLeaf(block)) {
sb.append("\n[Incoming]\n");
for (DataChannel channel : getIncomingChannels(block.getId())) {
sb.append(channel);
if (block.getUnionScanMap().containsKey(channel.getSrcId())) {
sb.append(", union delegated scan: ").append(block.getUnionScanMap().get(channel.getSrcId()));
}
sb.append("\n");
}
}
if (!isRoot(block)) {
sb.append("\n[Outgoing]\n");
for (DataChannel channel : getOutgoingChannels(block.getId())) {
sb.append(channel);
sb.append("\n");
}
}
if (block.getEnforcer().getProperties().size() > 0) {
sb.append("\n[Enforcers]\n");
int i = 0;
List<EnforceProperty> enforceProperties = block.getEnforcer().getProperties();
Collections.sort(enforceProperties, new Comparator<EnforceProperty>() {
@Override
public int compare(EnforceProperty o1, EnforceProperty o2) {
return o1.toString().compareTo(o2.toString());
}
});
for (EnforceProperty enforce : enforceProperties) {
sb.append(" ").append(i++).append(": ");
sb.append(Enforcer.toString(enforce));
sb.append("\n");
}
}
sb.append("\n").append(PlannerUtil.buildExplainString(block.getPlan()));
}
return sb.toString();
}
}