blob: f199be7e8dc52a4cd5c4bc6065c2817e5ff94f04 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner.global;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.planner.LogicalPlan.PIDFactory;
import org.apache.tajo.engine.planner.PlanningException;
import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.EdgeType;
import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.json.GsonObject;
import java.util.*;
import java.util.Map.Entry;
/**
* ExecutionPlan is a DAG of logical nodes.
* If there are two input sources, the plan should include Join or Union.
* The terminalNode is used as the start position of the traversal, because there are multiple output destinations.
*/
public class ExecutionPlan implements GsonObject {
@Expose private PIDFactory pidFactory;
@Expose private InputContext inputContext;
@Expose private boolean hasUnionPlan;
@Expose private boolean hasJoinPlan;
@Expose private LogicalRootNode terminalNode;
private Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
private SimpleDirectedGraph<Integer, ExecutionPlanEdge> graph
= new SimpleDirectedGraph<Integer, ExecutionPlanEdge>();
@Expose private NavigableMap<Integer, PlanGroup> planGroups = Maps.newTreeMap();
private ExecutionPlanBuilder builder = new ExecutionPlanBuilder(this);
private final ExecutionPlanComparator comparator = new ExecutionPlanComparator();
public static class PlanGroup {
@Expose private int rootPID;
@Expose private List<LogicalNode> nodes = Lists.newArrayList(); // order: root -> leaf
public PlanGroup(int rootPID) {
setId(rootPID);
}
public void setId(int rootPID) {
this.rootPID = rootPID;
}
public int getId() {
return rootPID;
}
public void addNodeAndDescendants(LogicalNode logicalNode) {
add(logicalNode);
if (logicalNode instanceof UnaryNode) {
addNodeAndDescendants(((UnaryNode) logicalNode).getChild());
} else if (logicalNode instanceof BinaryNode) {
addNodeAndDescendants(((BinaryNode) logicalNode).getLeftChild());
addNodeAndDescendants(((BinaryNode) logicalNode).getRightChild());
} else if (logicalNode instanceof TableSubQueryNode) {
addNodeAndDescendants(((TableSubQueryNode) logicalNode).getSubQuery());
}
}
public void add(LogicalNode logicalNode) {
nodes.add(logicalNode);
}
public LogicalNode toLinkedLogicalNode() {
LogicalNode[] nodes = this.nodes.toArray(new LogicalNode[this.nodes.size()]);
toLinkedLogicalNode(nodes, 0);
return nodes[0];
}
private int toLinkedLogicalNode(LogicalNode[] nodes, int i) {
if (i < nodes.length) {
if (nodes[i] instanceof UnaryNode) {
((UnaryNode)nodes[i]).setChild(nodes[i+1]);
return toLinkedLogicalNode(nodes, i + 1);
} else if (nodes[i] instanceof BinaryNode) {
((BinaryNode)nodes[i]).setLeftChild(nodes[i+1]);
int rightChildIndex = toLinkedLogicalNode(nodes, i+1);
((BinaryNode)nodes[i]).setRightChild(nodes[rightChildIndex]);
return toLinkedLogicalNode(nodes, rightChildIndex);
} else if (nodes[i] instanceof TableSubQueryNode) {
((TableSubQueryNode)nodes[i]).setSubQuery(nodes[i+1]);
return toLinkedLogicalNode(nodes, i+1);
} else {
return i+1;
}
} else {
return -1;
}
}
public LogicalNode getRootNode() {
return nodes.get(0);
}
public LogicalNode getLeafNode() {
return nodes.get(nodes.size()-1);
}
public void clear() {
nodes.clear();
}
}
private ExecutionPlan(PIDFactory pidFactory, LogicalRootNode terminalNode) {
this.pidFactory = pidFactory;
this.terminalNode = terminalNode;
}
public ExecutionPlan(PIDFactory pidFactory) {
this(pidFactory, new LogicalRootNode(pidFactory.newPID()));
}
public void setPlan(LogicalNode plan) {
this.clear();
this.addPlan(plan);
}
public void clear() {
for (ExecutionPlanEdge edge : graph.getEdgesAll()) {
graph.removeEdge(edge.getChildId(), edge.getParentId());
}
planGroups.clear();
vertices.clear();
this.inputContext = null;
this.hasUnionPlan = false;
this.hasJoinPlan = false;
}
public void addPlan(LogicalNode plan) {
LogicalNode topNode = plan;
if (topNode.getType() == NodeType.ROOT) {
topNode = ((LogicalRootNode)topNode).getChild();
}
SameNodeChecker sameNodeChecker = new SameNodeChecker();
sameNodeChecker.visit(topNode);
LogicalNode sameNode = getSameNode(topNode);
if (sameNode != null) {
connectChild(topNode, sameNode);
topNode = sameNode;
}
// add group
PlanGroup planGroup = new PlanGroup(topNode.getPID());
planGroup.addNodeAndDescendants(topNode);
planGroups.put(planGroup.rootPID, planGroup);
topNode = planGroup.getRootNode();
builder.visit(topNode);
this.add(topNode, terminalNode, EdgeType.SINGLE);
}
private LogicalNode getSameNode(LogicalNode node) {
for (Entry<Integer, LogicalNode> eachEntry : vertices.entrySet()) {
if (node.equals(eachEntry.getValue())) {
return eachEntry.getValue();
}
}
return null;
}
private class SameNodeChecker implements LogicalNodeVisitor {
@Override
public void visit(LogicalNode node) {
LogicalNode sameNode;
if (node instanceof UnaryNode) {
UnaryNode unaryNode = (UnaryNode) node;
sameNode = getSameNode(unaryNode.getChild());
if (sameNode != null) {
connectChild(unaryNode.getChild(), sameNode);
unaryNode.setChild(sameNode);
}
} else if (node instanceof BinaryNode) {
BinaryNode binaryNode = (BinaryNode) node;
sameNode = getSameNode(binaryNode.getLeftChild());
if (sameNode != null) {
connectChild(binaryNode.getLeftChild(), sameNode);
binaryNode.setLeftChild(sameNode);
}
sameNode = getSameNode(binaryNode.getRightChild());
if (sameNode != null) {
connectChild(binaryNode.getRightChild(), sameNode);
binaryNode.setRightChild(sameNode);
}
} else if (node instanceof TableSubQueryNode) {
TableSubQueryNode tableSubQueryNode = (TableSubQueryNode) node;
sameNode = getSameNode(tableSubQueryNode.getSubQuery());
if (sameNode != null) {
connectChild(tableSubQueryNode.getSubQuery(), sameNode);
tableSubQueryNode.setSubQuery(sameNode);
}
}
}
}
private void connectChild(LogicalNode originNode, LogicalNode replaceNode) {
if (replaceNode instanceof UnaryNode) {
((UnaryNode) replaceNode).setChild(((UnaryNode)originNode).getChild());
} else if (replaceNode instanceof BinaryNode) {
((BinaryNode) replaceNode).setLeftChild(((BinaryNode) originNode).getLeftChild());
((BinaryNode) replaceNode).setRightChild(((BinaryNode) originNode).getRightChild());
} else if (replaceNode instanceof TableSubQueryNode) {
((TableSubQueryNode) replaceNode).setSubQuery(((TableSubQueryNode) originNode).getSubQuery());
}
}
public PlanGroup remoteLogicalNodeGroup(int pid) {
// TODO: improve the code
PlanGroup removed = planGroups.remove(pid);
Collection<PlanGroup> remain = planGroups.values();
clear();
for (PlanGroup planGroup : remain) {
addPlan(planGroup.toLinkedLogicalNode());
}
return removed;
}
public PlanGroup getPlanGroupWithPID(int pid) {
return planGroups.get(pid);
}
public PlanGroup getFirstPlanGroup() {
return planGroups.firstEntry().getValue();
}
public boolean hasPlanGroup() {
return planGroups.size() > 0;
}
public void add(LogicalNode child, LogicalNode parent, EdgeType edgeType) {
vertices.put(child.getPID(), child);
vertices.put(parent.getPID(), parent);
graph.addEdge(child.getPID(), parent.getPID(), new ExecutionPlanEdge(child, parent, edgeType));
}
public void setInputContext(InputContext contexts) {
this.inputContext = contexts;
}
public boolean hasJoinPlan() {
return this.hasJoinPlan;
}
public boolean hasUnionPlan() {
return this.hasUnionPlan;
}
public LogicalRootNode getTerminalNode() {
return terminalNode;
}
public InputContext getInputContext() {
return inputContext;
}
public String toString() {
return graph.toStringGraph(terminalNode.getPID());
}
public EdgeType getEdgeType(LogicalNode child, LogicalNode parent) {
return graph.getEdge(child.getPID(), parent.getPID()).getEdgeType();
}
public LogicalNode getChild(LogicalNode parent, EdgeType edgeType) {
List<ExecutionPlanEdge> incomingEdges = graph.getIncomingEdges(parent.getPID());
for (ExecutionPlanEdge inEdge : incomingEdges) {
if (inEdge.getEdgeType() == edgeType) {
return vertices.get(inEdge.getChildId());
}
}
return null;
}
@Override
public String toJson() {
return CoreGsonHelper.toJson(this, ExecutionPlan.class);
}
public Schema getOutSchema(int i) {
return vertices.get(graph.getChild(terminalNode.getPID(), i)).getOutSchema();
}
public Schema getOutSchemaWithPID(int pid) {
return planGroups.get(pid).getRootNode().getOutSchema();
}
@Override
public boolean equals(Object o) {
if (o instanceof ExecutionPlan) {
ExecutionPlan other = (ExecutionPlan) o;
boolean eq = this.hasJoinPlan == other.hasJoinPlan;
eq &= this.hasUnionPlan == other.hasUnionPlan;
eq &= this.terminalNode.equals(other.terminalNode);
eq &= this.inputContext.equals(other.inputContext);
if (!eq) {
return false;
}
eq &= comparator.compare(this, other);
return eq;
}
return false;
}
public LogicalNode getTopNode(int index) {
return vertices.get(getTopNodePid(index));
}
public int getTopNodePid(int index) {
return graph.getChild(terminalNode.getPID(), index);
}
public LogicalNode getTopNodeFromPID(int pid) {
if (planGroups.containsKey(pid)) {
return planGroups.get(pid).getRootNode();
} else {
return null;
}
}
public int getChildCount(LogicalNode node) {
return graph.getChildCount(node.getPID());
}
public LogicalNode getChild(LogicalNode node, int i) {
return vertices.get(graph.getChild(node.getPID(), i));
}
public int getParentCount(LogicalNode node) {
return graph.getParentCount(node.getPID());
}
public LogicalNode getParent(LogicalNode node, int i) {
return vertices.get(graph.getParent(node.getPID(), i));
}
public List<LogicalNode> getChilds(LogicalNode node) {
List<LogicalNode> childs = new ArrayList<LogicalNode>();
for (Integer childId : graph.getChilds(node.getPID())) {
childs.add(vertices.get(childId));
}
return childs;
}
public void remove(LogicalNode child, LogicalNode parent) {
this.graph.removeEdge(child.getPID(), parent.getPID());
}
private static class PIDAndEdgeType {
@Expose int id;
@Expose EdgeType edgeType;
public PIDAndEdgeType(int id, EdgeType edgeType) {
this.id = id;
this.edgeType = edgeType;
}
}
public static class ExecutionPlanJsonHelper implements GsonObject {
@Expose private final PIDFactory pidFactory;
@Expose private final boolean hasJoinPlan;
@Expose private final boolean hasUnionPlan;
@Expose private final InputContext inputContext;
@Expose private final LogicalRootNode terminalNode;
@Expose private final Map<Integer, PlanGroup> planGroups;
public ExecutionPlanJsonHelper(ExecutionPlan plan) {
this.pidFactory = plan.pidFactory;
this.hasJoinPlan = plan.hasJoinPlan;
this.hasUnionPlan = plan.hasUnionPlan;
this.inputContext = plan.inputContext;
this.terminalNode = plan.terminalNode;
this.planGroups = plan.planGroups;
}
@Override
public String toJson() {
return CoreGsonHelper.toJson(this, ExecutionPlanJsonHelper.class);
}
public ExecutionPlan toExecutionPlan() {
ExecutionPlan plan = new ExecutionPlan(this.pidFactory, this.terminalNode);
plan.hasJoinPlan = this.hasJoinPlan;
plan.hasUnionPlan = this.hasUnionPlan;
plan.setInputContext(this.inputContext);
for (PlanGroup planGroup : planGroups.values()) {
plan.addPlan(planGroup.toLinkedLogicalNode());
}
return plan;
}
}
private static class ExecutionPlanComparator {
boolean equal = true;
private boolean recursiveCompare(ExecutionPlan plan1, ExecutionPlan plan2,
Stack<Integer> s1, Stack<Integer> s2) {
Integer l1 = s1.pop();
Integer l2 = s2.pop();
if (l1.equals(l2)) {
if (plan1.graph.getChildCount(l1) == plan2.graph.getChildCount(l2)) {
if (plan1.graph.getChildCount(l1) > 0
&& plan2.graph.getChildCount(l2) > 0) {
for (Integer child : plan1.graph.getChilds(l1)) {
s1.push(child);
}
for (Integer child : plan2.graph.getChilds(l2)) {
s2.push(child);
}
return recursiveCompare(plan1, plan2, s1, s2);
} else {
equal &= true;
}
} else {
equal = false;
}
} else {
equal = false;
}
return equal;
}
public boolean compare(ExecutionPlan plan1, ExecutionPlan plan2) {
if (plan1.getChildCount(plan1.terminalNode)
== plan2.getChildCount(plan2.terminalNode)) {
Stack<Integer> s1 = new Stack<Integer>();
Stack<Integer> s2 = new Stack<Integer>();
int childCount = plan1.getChildCount(plan1.terminalNode);
for (int i = 0; i < childCount; i++) {
s1.push(plan1.getTopNode(i).getPID());
s2.push(plan2.getTopNode(i).getPID());
}
return recursiveCompare(plan1, plan2, s1, s2);
} else {
return false;
}
}
}
private static class ExecutionPlanBuilder implements LogicalNodeVisitor {
private ExecutionPlan plan;
public ExecutionPlanBuilder(ExecutionPlan plan) {
this.plan = plan;
}
@Override
public void visit(LogicalNode current) {
try {
visit(current, EdgeType.SINGLE);
} catch (PlanningException e) {
throw new RuntimeException(e);
}
}
private void visit(LogicalNode current, EdgeType edgeType) throws PlanningException {
if (current instanceof UnaryNode) {
visitUnary((UnaryNode) current, edgeType);
} else if (current instanceof BinaryNode) {
visitBinary((BinaryNode) current, edgeType);
} else if (current instanceof ScanNode) {
visitScan((ScanNode) current, edgeType);
} else if (current instanceof TableSubQueryNode) {
visitTableSubQuery((TableSubQueryNode) current, edgeType);
}
}
private void visitScan(ScanNode node, EdgeType edgeType) throws PlanningException {
if (plan.inputContext == null) {
plan.inputContext = new InputContext();
}
plan.inputContext.addScanNode(node);
}
private void visitUnary(UnaryNode node, EdgeType edgeType) throws PlanningException {
if (node.getChild() != null) {
LogicalNode child = node.getChild();
plan.add(child, node, edgeType);
node.setChild(null);
visit(child, edgeType);
}
}
private void visitBinary(BinaryNode node, EdgeType edgeType) throws PlanningException {
Preconditions.checkArgument(edgeType == EdgeType.SINGLE);
LogicalNode child;
if (node.getType() == NodeType.JOIN) {
plan.hasJoinPlan = true;
} else if (node.getType() == NodeType.UNION) {
plan.hasUnionPlan = true;
}
if (node.getLeftChild() != null) {
child = node.getLeftChild();
plan.add(child, node, EdgeType.LEFT);
node.setLeftChild(null);
visit(child, EdgeType.LEFT);
}
if (node.getRightChild() != null) {
child = node.getRightChild();
plan.add(child, node, EdgeType.RIGHT);
node.setRightChild(null);
visit(child, EdgeType.RIGHT);
}
}
private void visitTableSubQuery(TableSubQueryNode node, EdgeType edgeType) throws PlanningException {
LogicalNode child = node.getSubQuery();
plan.add(child, node, edgeType);
node.nullifySubQuery();
visit(child, edgeType);
}
}
}