blob: c6864b9a0c6b856cdc3e9e9f37721d0fd8c1f96a [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner.global;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.SessionVars;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
* This class is a pointer to an ExecutionBlock that the query engine should execute.
*/
public class ExecutionBlockCursor implements Iterable<ExecutionBlock> {
private MasterPlan masterPlan;
private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
private List<BuildOrderItem> executionOrderedBlocks = new ArrayList<BuildOrderItem>();
private List<BuildOrderItem> notOrderedSiblingBlocks = new ArrayList<BuildOrderItem>();
private Map<ExecutionBlockId, AtomicInteger> orderRequiredChildCountMap = new HashMap<ExecutionBlockId, AtomicInteger>();
public ExecutionBlockCursor(MasterPlan plan) {
this(plan, false);
}
public ExecutionBlockCursor(MasterPlan plan, boolean siblingFirstOrder) {
this.masterPlan = plan;
if (siblingFirstOrder) {
buildSiblingFirstOrder(plan.getRoot());
} else {
buildDepthFirstOrder(plan.getRoot());
}
}
@Override
public Iterator<ExecutionBlock> iterator() {
return orderedBlocks.iterator();
}
public int size() {
return orderedBlocks.size();
}
public ExecutionQueue newCursor() {
int parallel = masterPlan.getContext().getInt(SessionVars.QUERY_EXECUTE_PARALLEL);
if (parallel > 1) {
return new ParallelExecutionQueue(masterPlan, parallel);
}
return new SimpleExecutionQueue();
}
public class SimpleExecutionQueue implements ExecutionQueue {
private final Iterator<ExecutionBlock> iterator = iterator();
private ExecutionBlock last;
@Override
public int size() {
return ExecutionBlockCursor.this.size();
}
@Override
public ExecutionBlock[] first() {
return iterator.hasNext() ? next(null) : null;
}
@Override
public ExecutionBlock[] next(ExecutionBlockId blockId) {
return iterator.hasNext() ? new ExecutionBlock[]{last = iterator.next()} : null;
}
public String toString() {
StringBuilder sb = new StringBuilder();
for (ExecutionBlock block : ExecutionBlockCursor.this) {
if (sb.length() > 0) {
sb.append(',');
}
if (block == last) {
sb.append('(');
}
sb.append(block.getId().getId());
if (block == last) {
sb.append(')');
}
}
return sb.toString();
}
}
// Add all execution blocks in a depth first and postfix order
private void buildDepthFirstOrder(ExecutionBlock current) {
if (!masterPlan.isLeaf(current.getId())) {
for (ExecutionBlock execBlock : masterPlan.getChilds(current)) {
buildDepthFirstOrder(execBlock);
}
}
orderedBlocks.add(current);
}
private void buildSiblingFirstOrder(ExecutionBlock current) {
/*
|-eb_1404887024677_0004_000007
|-eb_1404887024677_0004_000006
|-eb_1404887024677_0004_000005
|-eb_1404887024677_0004_000004
|-eb_1404887024677_0004_000003
|-eb_1404887024677_0004_000002
|-eb_1404887024677_0004_000001
In the case of the upper plan, buildDepthFirstOrder() makes the following order in a depth first and postfix order.
[eb_1, eb_2, eb_3, eb_4, eb_5, eb_6, eb_7]
The eb_2 doesn't know eb_3's output bytes and uses a size of eb_4's all scan nodes.
buildSiblingFirstOrder() makes the following order in a sibling order.
[eb_1, eb_3, eb_2, eb_4, eb_5, eb_6, eb_7]
In this order the eb_2 knows eb_3's output bytes and the eb_4 also knows eb_1's output bytes.
*/
preExecutionOrder(new BuildOrderItem(null, current));
for (BuildOrderItem eachItem: executionOrderedBlocks) {
if (masterPlan.isLeaf(eachItem.eb.getId())) {
orderedBlocks.add(eachItem.eb);
orderRequiredChildCountMap.get(eachItem.parentEB.getId()).decrementAndGet();
} else {
if (eachItem.allSiblingsOrdered()) {
for (BuildOrderItem eachSiblingItem: notOrderedSiblingBlocks) {
orderedBlocks.add(eachSiblingItem.eb);
}
orderedBlocks.add(eachItem.eb);
notOrderedSiblingBlocks.clear();
} else {
notOrderedSiblingBlocks.add(eachItem);
}
}
}
}
private void preExecutionOrder(BuildOrderItem current) {
Stack<BuildOrderItem> stack = new Stack<BuildOrderItem>();
if (!masterPlan.isLeaf(current.eb.getId())) {
List<ExecutionBlock> children = masterPlan.getChilds(current.eb);
orderRequiredChildCountMap.put(current.eb.getId(), new AtomicInteger(children.size()));
for (ExecutionBlock execBlock : children) {
BuildOrderItem item = new BuildOrderItem(current.eb, execBlock);
item.setSiblings(children);
if (!masterPlan.isLeaf(execBlock)) {
preExecutionOrder(item);
} else {
stack.push(item);
}
}
for (BuildOrderItem eachItem : stack) {
preExecutionOrder(eachItem);
}
}
executionOrderedBlocks.add(current);
}
class BuildOrderItem {
ExecutionBlock eb;
ExecutionBlock parentEB;
List<ExecutionBlockId> siblings = new ArrayList<ExecutionBlockId>();
BuildOrderItem(ExecutionBlock parentEB, ExecutionBlock eb) {
this.parentEB = parentEB;
this.eb = eb;
}
public void setSiblings(List<ExecutionBlock> siblings) {
for (ExecutionBlock eachEB: siblings) {
if (eachEB.getId().equals(eb.getId())) {
continue;
}
this.siblings.add(eachEB.getId());
}
}
public boolean allSiblingsOrdered() {
for (ExecutionBlockId eachSibling: siblings) {
if (orderRequiredChildCountMap.get(eachSibling) != null &&
orderRequiredChildCountMap.get(eachSibling).get() > 0) {
return false;
}
}
return true;
}
@Override
public String toString() {
return eb.toString();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof BuildOrderItem)) {
return false;
}
return eb.equals(((BuildOrderItem) obj).eb);
}
@Override
public int hashCode() {
int result = eb != null ? eb.hashCode() : 0;
result = 31 * result + (parentEB != null ? parentEB.hashCode() : 0);
result = 31 * result + (siblings != null ? siblings.hashCode() : 0);
return result;
}
}
}