blob: 1ffeb4ac989c87dde0cbe4ad1ec3cc8352ea8503 [file] [log] [blame]
package org.apache.rya.rdftriplestore.evaluation;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.Join;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;
/**
*/
public class ParallelJoinIterator extends LookAheadIteration<BindingSet, QueryEvaluationException> {
public static final EmptyBindingSet EMPTY_BINDING_SET = new EmptyBindingSet();
private final EvaluationStrategy strategy;
private final Join join;
private final CloseableIteration<BindingSet, QueryEvaluationException> leftIter;
private ExecutorService executorService;
private Queue<ParallelIteratorWork> workQueue = new LinkedBlockingQueue<ParallelIteratorWork>();
private ParallelIteratorWork currentWork;
private int batch;
public ParallelJoinIterator(EvaluationStrategy strategy, Join join, BindingSet bindings, ExecutorService executorService, int batch)
throws QueryEvaluationException {
this.strategy = strategy;
this.join = join;
leftIter = strategy.evaluate(join.getLeftArg(), bindings);
this.executorService = executorService;
this.batch = batch;
}
@Override
protected BindingSet getNextElement() throws QueryEvaluationException {
try {
while (leftIter.hasNext() || !workQueue.isEmpty() || currentWork != null) {
if (!workQueue.isEmpty() && currentWork == null) {
currentWork = workQueue.poll();
}
if (currentWork != null) {
BindingSet bindingSet = currentWork.queue.poll();
if (EMPTY_BINDING_SET.equals(bindingSet)) {
currentWork = null;
continue;
} else if (bindingSet == null) {
continue;
}
return bindingSet;
}
try {
for (int i = 0; i < batch; i++) {
if (leftIter.hasNext()) {
ParallelIteratorWork work = new ParallelIteratorWork(leftIter.next(), join.getRightArg());
workQueue.add(work);
executorService.execute(work);
} else
break;
}
} catch (NoSuchElementException ignore) {
}
}
} catch (Exception e) {
throw new QueryEvaluationException(e);
}
return null;
}
@Override
protected void handleClose() throws QueryEvaluationException {
try {
super.handleClose();
leftIter.close();
// rightIter.close();
} catch (Exception e) {
throw new QueryEvaluationException(e);
}
}
private class ParallelIteratorWork implements Runnable {
private BindingSet leftBindingSet;
private TupleExpr rightTupleExpr;
public LinkedBlockingQueue<BindingSet> queue = new LinkedBlockingQueue<BindingSet>();
private ParallelIteratorWork(BindingSet leftBindingSet, TupleExpr rightTupleExpr) {
this.leftBindingSet = leftBindingSet;
this.rightTupleExpr = rightTupleExpr;
}
@Override
public void run() {
try {
CloseableIteration<BindingSet, QueryEvaluationException> iter = strategy.evaluate(rightTupleExpr, leftBindingSet);
while (iter.hasNext()) {
queue.add(iter.next());
}
queue.add(EMPTY_BINDING_SET);
iter.close();
} catch (QueryEvaluationException e) {
throw new RuntimeException(e);
}
}
}
}