blob: 1815e1a306003ba9f3e70b2e176f3298ed9db1b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.sparql.api;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.jena.graph.Graph;
import org.apache.jena.graph.NodeFactory;
import org.apache.jena.query.* ;
import org.apache.jena.rdf.model.Model ;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.rdf.model.Property ;
import org.apache.jena.rdf.model.Resource ;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.core.DatasetGraphFactory;
import org.apache.jena.sparql.exec.QueryExec;
import org.apache.jena.sparql.function.FunctionRegistry ;
import org.apache.jena.sparql.function.library.wait ;
import org.apache.jena.sparql.graph.GraphFactory ;
import org.apache.jena.sparql.sse.SSE;
import org.junit.AfterClass ;
import org.junit.Assert;
import org.junit.BeforeClass ;
import org.junit.Test ;
public class TestQueryExecutionCancel {
private static final String ns = "http://example/ns#" ;
static Model m = GraphFactory.makeJenaDefaultModel() ;
static Resource r1 = m.createResource() ;
static Property p1 = m.createProperty(ns+"p1") ;
static Property p2 = m.createProperty(ns+"p2") ;
static Property p3 = m.createProperty(ns+"p3") ;
static {
m.add(r1, p1, "x1") ;
m.add(r1, p2, "X2") ; // NB Capital
m.add(r1, p3, "y1") ;
}
@BeforeClass public static void beforeClass() { FunctionRegistry.get().put(ns + "wait", wait.class) ; }
@AfterClass public static void afterClass() { FunctionRegistry.get().remove(ns + "wait") ; }
@Test(expected=QueryCancelledException.class)
public void test_Cancel_API_1()
{
try(QueryExecution qExec = makeQExec("SELECT * {?s ?p ?o}")) {
ResultSet rs = qExec.execSelect() ;
assertTrue(rs.hasNext()) ;
qExec.abort();
assertTrue(rs.hasNext()) ;
rs.nextSolution();
assertFalse("Results not expected after cancel.", rs.hasNext()) ;
}
}
@Test(expected=QueryCancelledException.class)
public void test_Cancel_API_2()
{
try(QueryExecution qExec = makeQExec("PREFIX ex: <" + ns + "> SELECT * {?s ?p ?o . FILTER ex:wait(100) }")) {
ResultSet rs = qExec.execSelect() ;
assertTrue(rs.hasNext()) ;
qExec.abort();
assertTrue(rs.hasNext()) ;
rs.nextSolution();
assertFalse("Results not expected after cancel.", rs.hasNext()) ;
}
}
@Test public void test_Cancel_API_3() throws InterruptedException
{
// Don't qExec.close on this thread.
QueryExecution qExec = makeQExec("PREFIX ex: <" + ns + "> SELECT * { ?s ?p ?o . FILTER ex:wait(100) }") ;
CancelThreadRunner thread = new CancelThreadRunner(qExec);
thread.start();
synchronized (qExec) { qExec.wait() ; }
synchronized (qExec) { qExec.abort() ;}
synchronized (qExec) { qExec.notify() ; }
assertEquals (1, thread.getCount()) ;
}
@Test public void test_Cancel_API_4() throws InterruptedException
{
// Don't qExec.close on this thread.
QueryExecution qExec = makeQExec("PREFIX ex: <" + ns + "> SELECT * { ?s ?p ?o } ORDER BY ex:wait(100)") ;
CancelThreadRunner thread = new CancelThreadRunner(qExec);
thread.start();
synchronized (qExec) { qExec.wait() ; }
synchronized (qExec) { qExec.abort(); }
synchronized (qExec) { qExec.notify() ; }
assertEquals (1, thread.getCount()) ;
}
@Test(expected = QueryCancelledException.class)
public void test_Cancel_API_5() {
try (QueryExecution qe = QueryExecutionFactory.create("SELECT * { ?s ?p ?o }", m)) {
qe.abort();
ResultSetFormatter.consume(qe.execSelect());
}
}
private QueryExecution makeQExec(String queryString)
{
Query q = QueryFactory.create(queryString) ;
QueryExecution qExec = QueryExecutionFactory.create(q, m) ;
return qExec ;
}
class CancelThreadRunner extends Thread
{
private QueryExecution qExec = null ;
private int count = 0 ;
public CancelThreadRunner(QueryExecution qExec)
{
this.qExec = qExec ;
}
@Override
public void run()
{
try
{
ResultSet rs = qExec.execSelect() ;
while ( rs.hasNext() )
{
rs.nextSolution() ;
count++ ;
synchronized (qExec) { qExec.notify() ; }
synchronized (qExec) { qExec.wait() ; }
}
}
catch (QueryCancelledException e) {}
catch (InterruptedException e) {
e.printStackTrace();
} finally { qExec.close() ; }
}
public int getCount()
{
return count ;
}
}
@Test
public void test_cancel_select_1() {
cancellationTest("SELECT * {}", QueryExec::select);
}
@Test
public void test_cancel_select_2() {
cancellationTest("SELECT * {}", QueryExec::select, Iterator::hasNext);
}
@Test
public void test_cancel_ask() {
cancellationTest("ASK {}", QueryExec::ask);
}
@Test
public void test_cancel_construct() {
cancellationTest("CONSTRUCT WHERE {}", QueryExec::construct);
}
@Test
public void test_cancel_describe() {
cancellationTest("DESCRIBE * {}", QueryExec::describe);
}
@Test
public void test_cancel_construct_dataset() {
cancellationTest("CONSTRUCT{} WHERE{}", QueryExec::constructDataset);
}
@Test
public void test_cancel_construct_triples_1() {
cancellationTest("CONSTRUCT{} WHERE{}", QueryExec::constructTriples, Iterator::hasNext);
}
@Test
public void test_cancel_construct_triples_2() {
cancellationTest("CONSTRUCT{} WHERE{}", QueryExec::constructTriples);
}
@Test
public void test_cancel_construct_quads_1() {
cancellationTest("CONSTRUCT{} WHERE{}", QueryExec::constructQuads, Iterator::hasNext);
}
@Test
public void test_cancel_construct_quads_2() {
cancellationTest("CONSTRUCT{} WHERE{}", QueryExec::constructQuads);
}
@Test
public void test_cancel_json() {
cancellationTest("JSON {\":a\": \"b\"} WHERE {}", exec->exec.execJson().get(0));
}
static <T> void cancellationTest(String queryString, Function<QueryExec, Iterator<T>> itFactory, Consumer<Iterator<T>> itConsumer) {
cancellationTest(queryString, itFactory::apply);
cancellationTestForIterator(queryString, itFactory, itConsumer);
}
/** Abort the query exec and expect all execution methods to fail */
static void cancellationTest(String queryString, Consumer<QueryExec> execAction) {
DatasetGraph dsg = DatasetGraphFactory.createTxnMem();
dsg.add(SSE.parseQuad("(_ :s :p :o)"));
try(QueryExec aExec = QueryExec.dataset(dsg).query(queryString).build()) {
aExec.abort();
assertThrows(QueryCancelledException.class, ()-> execAction.accept(aExec));
}
}
/** Obtain an iterator and only afterwards abort the query exec.
* Operations on the iterator are now expected to fail. */
static <T> void cancellationTestForIterator(String queryString, Function<QueryExec, Iterator<T>> itFactory, Consumer<Iterator<T>> itConsumer) {
DatasetGraph dsg = DatasetGraphFactory.createTxnMem();
dsg.add(SSE.parseQuad("(_ :s :p :o)"));
try(QueryExec aExec = QueryExec.dataset(dsg).query(queryString).build()) {
Iterator<T> it = itFactory.apply(aExec);
aExec.abort();
assertThrows(QueryCancelledException.class, ()-> itConsumer.accept(it));
}
}
/**
* Test that creates iterators over a billion result rows and attempts to cancel them.
* If this test hangs then it is likely that something went wrong in the cancellation machinery.
*/
@Test(timeout = 10000)
public void test_cancel_concurrent_1() {
int maxCancelDelayInMillis = 100;
int cpuCount = Runtime.getRuntime().availableProcessors();
// Spend at most roughly 1 second per cpu (10 tasks a max 100ms)
int taskCount = cpuCount * 10;
// Create a model with 1000 triples
Graph graph = GraphFactory.createDefaultGraph();
IntStream.range(0, 1000)
.mapToObj(i -> NodeFactory.createURI("http://www.example.org/r" + i))
.forEach(node -> graph.add(node, node, node));
Model model = ModelFactory.createModelForGraph(graph);
// Create a query that creates 3 cross joins - resulting in one billion result rows
Query query = QueryFactory.create("SELECT * { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i . }");
Callable<QueryExecution> qeFactory = () -> QueryExecutionFactory.create(query, model);
runConcurrentAbort(taskCount, maxCancelDelayInMillis, qeFactory, TestQueryExecutionCancel::doCount);
}
private static final int doCount(QueryExecution qe) {
try (QueryExecution qe2 = qe) {
ResultSet rs = qe2.execSelect();
int size = ResultSetFormatter.consume(rs);
return size;
}
}
/**
* Reusable method that creates a parallel stream that starts query executions
* and schedules cancel tasks on a separate thread pool.
*/
public static void runConcurrentAbort(int taskCount, int maxCancelDelay, Callable<QueryExecution> qeFactory, Function<QueryExecution, ?> processor) {
Random cancelDelayRandom = new Random();
ExecutorService executorService = Executors.newCachedThreadPool();
try {
List<Integer> list = IntStream.range(0, taskCount).boxed().collect(Collectors.toList());
list
.parallelStream()
.forEach(i -> {
QueryExecution qe;
try {
qe = qeFactory.call();
} catch (Exception e) {
throw new RuntimeException("Failed to build a query execution", e);
}
Future<?> future = executorService.submit(() -> processor.apply(qe));
int delayToAbort = cancelDelayRandom.nextInt(maxCancelDelay);
try {
Thread.sleep(delayToAbort);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// System.out.println("Abort: " + qe);
qe.abort();
try {
// System.out.println("Waiting for: " + qe);
future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (!(cause instanceof QueryCancelledException)) {
// Unexpected exception - print out the stack trace
e.printStackTrace();
}
Assert.assertEquals(QueryCancelledException.class, cause.getClass());
} catch (InterruptedException e) {
// Ignored
} finally {
// System.out.println("Completed: " + qe);
}
});
} finally {
executorService.shutdownNow();
}
}
}