blob: d197bda7ef913407c1399d7943dcb03ebfb1b887 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner.global;
import com.google.common.collect.Iterables;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.ExecutionBlockId;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Stack;
public class ParallelExecutionQueue implements ExecutionQueue, Iterable<ExecutionBlock> {
private static final Log LOG = LogFactory.getLog(ParallelExecutionQueue.class);
private final int maximum;
private final MasterPlan masterPlan;
private final List<Deque<ExecutionBlock>> executable;
private final Set<ExecutionBlockId> executed = new HashSet<>();
public ParallelExecutionQueue(MasterPlan masterPlan, int maximum) {
this.masterPlan = masterPlan;
this.maximum = maximum;
this.executable = toStacks(masterPlan.getRoot());
}
private List<Deque<ExecutionBlock>> toStacks(ExecutionBlock root) {
List<Deque<ExecutionBlock>> stacks = new ArrayList<>();
toStacks(root, stacks, new ArrayList<>());
return stacks;
}
// currently, diamond shaped DAG is not supported in tajo
private void toStacks(ExecutionBlock current, List<Deque<ExecutionBlock>> queues,
List<ExecutionBlock> stack) {
stack.add(current);
if (masterPlan.isLeaf(current.getId())) {
queues.add(new ArrayDeque<>(stack));
} else {
List<ExecutionBlock> children = masterPlan.getChilds(current);
for (int i = 0; i < children.size(); i++) {
toStacks(children.get(i), queues, i == 0 ? stack : new Stack<>());
}
}
}
@Override
public synchronized int size() {
int size = 0;
for (Deque<ExecutionBlock> queue : executable) {
size += queue.size();
}
return size;
}
@Override
public synchronized ExecutionBlock[] first() {
int max = Math.min(maximum, executable.size());
List<ExecutionBlock> result = new ArrayList<>();
for (Deque<ExecutionBlock> queue : executable) {
if (result.size() < max && isExecutableNow(queue.peekLast())) {
result.add(queue.removeLast());
}
}
LOG.info("Initial executable blocks " + result);
return result.toArray(new ExecutionBlock[result.size()]);
}
@Override
public synchronized ExecutionBlock[] next(ExecutionBlockId doneNow) {
executed.add(doneNow);
int remaining = 0;
for (Deque<ExecutionBlock> queue : executable) {
if (!queue.isEmpty() && isExecutableNow(queue.peekLast())) {
LOG.info("Next executable block " + queue.peekLast());
return new ExecutionBlock[]{queue.removeLast()};
}
remaining += queue.size();
}
return remaining > 0 ? new ExecutionBlock[0] : null;
}
private boolean isExecutableNow(ExecutionBlock current) {
for (ExecutionBlock child : masterPlan.getChilds(current)) {
if (!executed.contains(child.getId())) {
return false; // there's something should be done before this
}
}
return true;
}
@Override
public Iterator<ExecutionBlock> iterator() {
return Iterables.concat(executable).iterator();
}
}