| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.jena.sparql.exec; |
| |
| import java.util.*; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| |
| import org.apache.jena.atlas.iterator.Iter; |
| import org.apache.jena.atlas.json.JsonArray; |
| import org.apache.jena.atlas.json.JsonObject; |
| import org.apache.jena.atlas.lib.Alarm; |
| import org.apache.jena.atlas.lib.AlarmClock; |
| import org.apache.jena.atlas.logging.Log; |
| import org.apache.jena.graph.Graph; |
| import org.apache.jena.graph.Node; |
| import org.apache.jena.graph.Triple; |
| import org.apache.jena.query.Query; |
| import org.apache.jena.query.QueryCancelledException; |
| import org.apache.jena.query.QueryExecException; |
| import org.apache.jena.rdf.model.Model; |
| import org.apache.jena.rdf.model.ModelFactory; |
| import org.apache.jena.rdf.model.RDFNode; |
| import org.apache.jena.rdf.model.Resource; |
| import org.apache.jena.riot.system.PrefixMap; |
| import org.apache.jena.sparql.ARQConstants; |
| import org.apache.jena.sparql.core.DatasetGraph; |
| import org.apache.jena.sparql.core.Quad; |
| import org.apache.jena.sparql.core.Var; |
| import org.apache.jena.sparql.core.describe.DescribeHandler; |
| import org.apache.jena.sparql.core.describe.DescribeHandlerRegistry; |
| import org.apache.jena.sparql.engine.Plan; |
| import org.apache.jena.sparql.engine.QueryEngineFactory; |
| import org.apache.jena.sparql.engine.QueryIterator; |
| import org.apache.jena.sparql.engine.binding.Binding; |
| import org.apache.jena.sparql.engine.binding.BindingFactory; |
| import org.apache.jena.sparql.engine.iterator.QueryIteratorWrapper; |
| import org.apache.jena.sparql.graph.GraphOps; |
| import org.apache.jena.sparql.modify.TemplateLib; |
| import org.apache.jena.sparql.syntax.ElementGroup; |
| import org.apache.jena.sparql.syntax.Template; |
| import org.apache.jena.sparql.util.Context; |
| import org.apache.jena.sparql.util.ModelUtils; |
| |
| /** All the SPARQL query result forms at the graph-level. */ |
| |
| public class QueryExecDataset implements QueryExec |
| { |
| public static QueryExecDatasetBuilder newBuilder() { return QueryExecDatasetBuilder.create(); } |
| |
| private final Query query; |
| private String queryString = null; |
| private final QueryEngineFactory qeFactory; |
| private final Context context; |
| private final DatasetGraph dataset; |
| |
| private QueryIterator queryIterator = null; |
| private Plan plan = null; |
| private Binding initialBinding = null; |
| |
| private boolean closed; |
| private AtomicReference<TimeoutCallback> expectedCallback = new AtomicReference<>(null); |
| private Alarm timeout1Alarm = null; |
| private Alarm timeout2Alarm = null; |
| |
| // synchronization. |
| private final Object lockTimeout = new Object(); |
| private static final long TIMEOUT_UNSET = -1; |
| private static final long TIMEOUT_INF = -2; |
| private long timeout1 = TIMEOUT_UNSET; |
| private long timeout2 = TIMEOUT_UNSET; |
| private final AlarmClock alarmClock = AlarmClock.get(); |
| private long queryStartTime = -1; // Unset |
| private AtomicBoolean cancelSignal = new AtomicBoolean(false); |
| |
| protected QueryExecDataset(Query query, String queryString, DatasetGraph datasetGraph, Context cxt, |
| QueryEngineFactory qeFactory, |
| long timeout1, TimeUnit timeUnit1, long timeout2, TimeUnit timeUnit2, |
| Binding initialToEngine) { |
| // Context cxt is already a safe copy. |
| this.query = query; |
| this.queryString = queryString; |
| this.dataset = datasetGraph; |
| this.qeFactory = qeFactory; |
| this.context = (cxt == null) ? Context.setupContextForDataset(cxt, datasetGraph) : cxt; |
| this.timeout1 = asMillis(timeout1, timeUnit1); |
| this.timeout2 = asMillis(timeout2, timeUnit2); |
| // See also query substitution handled in QueryExecBuilder |
| this.initialBinding = initialToEngine; |
| init(); |
| } |
| |
| private void init() { |
| Context.setCurrentDateTime(context); |
| if ( query != null ) |
| context.put(ARQConstants.sysCurrentQuery, query); |
| } |
| |
| private static long asMillis(long duration, TimeUnit timeUnit) { |
| return (duration < 0) ? duration : timeUnit.toMillis(duration); |
| } |
| |
| @Override |
| public void close() { |
| closed = true; |
| if ( queryIterator != null ) |
| queryIterator.close(); |
| if ( plan != null ) |
| plan.close(); |
| if ( timeout1Alarm != null ) |
| alarmClock.cancel(timeout1Alarm); |
| if ( timeout2Alarm != null ) |
| alarmClock.cancel(timeout2Alarm); |
| } |
| |
| @Override |
| public boolean isClosed() { |
| return closed; |
| } |
| |
| private void checkNotClosed() { |
| if ( closed ) |
| throw new QueryExecException("HTTP QueryExecution has been closed"); |
| } |
| |
| @Override |
| public void abort() { |
| cancelSignal.set(true); |
| synchronized (lockTimeout) { |
| // This is called asynchronously to the execution. |
| // synchronized is for coordination with other calls of |
| // .abort and with the timeout2 reset code. |
| if ( queryIterator != null ) |
| // we notify the chain of iterators, |
| // however, we do *not* close the |
| // iterators. |
| // That happens after the cancellation |
| // is properly over. |
| queryIterator.cancel(); |
| } |
| } |
| |
| @Override |
| public RowSet select() { |
| checkNotClosed(); |
| if ( !query.isSelectType() ) |
| throw new QueryExecException("Attempt to have RowSet from a " + labelForQuery(query) + " query"); |
| RowSet rowSet = execute(); |
| return rowSet; |
| } |
| |
| private RowSet execute() { |
| startQueryIterator(); |
| List<Var> vars = query.getResultVars().stream().map(Var::alloc).collect(Collectors.toList()); |
| return RowSetStream.create(vars, queryIterator); |
| } |
| |
| // -- Construct |
| @Override |
| public Graph construct(Graph graph) { |
| checkNotClosed(); |
| try { |
| Iterator<Triple> it = constructTriples(); |
| // Prefixes for result |
| insertPrefixesInto(graph); |
| GraphOps.addAll(graph, it); |
| } |
| finally { |
| this.close(); |
| } |
| return graph; |
| } |
| |
| @Override |
| public Iterator<Triple> constructTriples() { |
| checkNotClosed(); |
| if ( !query.isConstructType() ) |
| throw new QueryExecException("Attempt to get a CONSTRUCT model from a " + labelForQuery(query) + " query"); |
| // This causes there to be no PROJECT around the pattern. |
| // That in turn, exposes the initial bindings. |
| query.setQueryResultStar(true); |
| |
| startQueryIterator(); |
| |
| Template template = query.getConstructTemplate(); |
| return TemplateLib.calcTriples(template.getTriples(), queryIterator); |
| } |
| |
| // -- Construct Quads |
| |
| @Override |
| public Iterator<Quad> constructQuads() { |
| checkNotClosed(); |
| if ( !query.isConstructType() ) |
| throw new QueryExecException("Attempt to get a CONSTRUCT model from a " + labelForQuery(query) + " query"); |
| // This causes there to be no PROJECT around the pattern. |
| // That in turn, exposes the initial bindings. |
| query.setQueryResultStar(true); |
| |
| startQueryIterator(); |
| |
| Template template = query.getConstructTemplate(); |
| return TemplateLib.calcQuads(template.getQuads(), queryIterator); |
| } |
| |
| @Override |
| public DatasetGraph constructDataset(DatasetGraph dataset) { |
| try { |
| Iterator<Quad> iter = constructQuads(); |
| iter.forEachRemaining(dataset::add); |
| Iter.close(iter); |
| insertPrefixesInto(dataset); |
| } finally { |
| this.close(); |
| } |
| return dataset; |
| } |
| |
| // -- Describe |
| |
| @Override |
| public Graph describe(Graph graph) { |
| checkNotClosed(); |
| Model model = ModelFactory.createModelForGraph(graph); |
| |
| if ( !query.isDescribeType() ) |
| throw new QueryExecException("Attempt to get a DESCRIBE result from a " + labelForQuery(query) + " query"); |
| query.setResultVars(); |
| // If there was no WhereClause, use an empty pattern (one solution, no |
| // columns). |
| if ( query.getQueryPattern() == null ) |
| query.setQueryPattern(new ElementGroup()); |
| |
| Set<Node> set = new HashSet<>(); |
| |
| RowSet rows = execute(); |
| |
| // Prefixes for result (after initialization) |
| insertPrefixesInto(graph); |
| |
| // Variables in DESCRIBE |
| if ( rows != null ) { |
| // Single pass over rows. |
| rows.forEachRemaining(row->{ |
| for ( Var var : rows.getResultVars() ) { |
| Node n = row.get(var); |
| if ( n != null ) |
| set.add(n); |
| } |
| }); |
| } |
| |
| // Any URIs in the DESCRIBE |
| if ( query.getResultURIs() != null ) { |
| query.getResultURIs().forEach(set::add); |
| } |
| |
| // DescribeHandlers work on models. |
| |
| // Create new handlers for this process. |
| List<DescribeHandler> dhList = DescribeHandlerRegistry.get().newHandlerList(); |
| getContext().put(ARQConstants.sysCurrentDataset, getDataset()); |
| // Notify start of describe phase |
| for ( DescribeHandler dh : dhList ) |
| dh.start(model, getContext()); |
| |
| // Do describe for each resource found. |
| for ( Node n : set ) { |
| RDFNode rdfNode = ModelUtils.convertGraphNodeToRDFNode(n, model); |
| |
| if ( rdfNode instanceof Resource ) { |
| for ( DescribeHandler dh : dhList ) { |
| dh.describe((Resource)rdfNode); |
| } |
| } else { |
| // Can't describe literals |
| continue; |
| } |
| } |
| |
| for ( DescribeHandler dh : dhList ) |
| dh.finish(); |
| |
| this.close(); |
| return graph; |
| } |
| |
| // ?? Change to iterator from Describe Handlers. |
| // (Streaming DESCRIBE isn't important enough to worry about) |
| @Override |
| public Iterator<Triple> describeTriples() { |
| return describe().find(); |
| } |
| |
| @Override |
| public boolean ask() { |
| checkNotClosed(); |
| if ( !query.isAskType() ) |
| throw new QueryExecException("Attempt to have boolean from a " + labelForQuery(query) + " query"); |
| |
| startQueryIterator(); |
| |
| boolean r; |
| try { |
| // Not hasNext because setting timeout1 which applies to getting |
| // the first result, not testing for it. |
| queryIterator.next(); |
| r = true; |
| } catch (NoSuchElementException ex) { |
| r = false; |
| } finally { |
| this.close(); |
| } |
| return r; |
| } |
| |
| @Override |
| public JsonArray execJson() { |
| checkNotClosed(); |
| if ( !query.isJsonType() ) |
| throw new QueryExecException("Attempt to get a JSON result from a " + labelForQuery(query) + " query"); |
| startQueryIterator(); |
| JsonArray jsonArray = JsonResults.results(queryIterator, query.getJsonMapping()); |
| List<String> resultVars = query.getResultVars(); |
| return jsonArray; |
| } |
| |
| @Override |
| public Iterator<JsonObject> execJsonItems() { |
| checkNotClosed(); |
| if ( !query.isJsonType() ) |
| throw new QueryExecException("Attempt to get a JSON result from a " + labelForQuery(query) + " query"); |
| startQueryIterator(); |
| return JsonResults.iterator(queryIterator, query.getJsonMapping()); |
| } |
| |
| private static boolean isTimeoutSet(long x) { |
| return x >= 0; |
| } |
| |
| class TimeoutCallback implements Runnable { |
| @Override |
| public void run() { |
| synchronized (lockTimeout) { |
| // Abort query if and only if we are the expected callback. |
| // If the first row has appeared, and we are removing timeout1 |
| // callback, it still may go off so it needs to check here |
| // it's still wanted. |
| if ( expectedCallback.get() == this ) { |
| if ( cancelSignal != null ) |
| cancelSignal.set(true); |
| } |
| } |
| } |
| } |
| |
| private class QueryIteratorTimer2 extends QueryIteratorWrapper { |
| |
| public QueryIteratorTimer2(QueryIterator qIter) { |
| super(qIter); |
| } |
| |
| long yieldCount = 0; |
| boolean resetDone = false; |
| @Override |
| protected Binding moveToNextBinding() |
| { |
| Binding b = super.moveToNextBinding(); |
| yieldCount++; |
| |
| if ( ! resetDone ) |
| { |
| // Sync on calls of .abort. |
| // So nearly not needed. |
| synchronized(lockTimeout) |
| { |
| TimeoutCallback callback = new TimeoutCallback(); |
| expectedCallback.set(callback); |
| // Lock against calls of .abort() or of timeout1Callback. |
| |
| // Update/check the volatiles in a careful order. |
| // This cause timeout1 not to call .abort and hence not set isCancelled |
| |
| // But if timeout1 went off after moveToNextBinding, before expectedCallback is set, |
| // then forget the row and cancel the query. |
| if ( cancelSignal.get() ) |
| // timeout1 went off after the binding was yielded but |
| // before we got here. |
| throw new QueryCancelledException(); |
| if ( timeout1Alarm != null ) { |
| alarmClock.cancel(timeout1Alarm); |
| timeout1Alarm = null; |
| } |
| |
| // Now arm the second timeout, if any. |
| if ( timeout2 > 0 ) { |
| // Set to time remaining. |
| long t = timeout2 - (System.currentTimeMillis()-queryStartTime); |
| // Not first timeout - finite second timeout for remaining time. |
| timeout2Alarm = alarmClock.add(callback, t); |
| } |
| resetDone = true; |
| } |
| } |
| return b; |
| } |
| } |
| |
| protected void execInit() { |
| if ( queryStartTime <= -1 ) |
| queryStartTime = System.currentTimeMillis(); |
| } |
| |
| /** Wrapper for starting the query iterator but also dealing with cancellation */ |
| private void startQueryIterator() { |
| synchronized (lockTimeout) { |
| if (cancelSignal.get()) { |
| // Fail before starting the iterator if cancelled already |
| throw new QueryCancelledException(); |
| } |
| |
| startQueryIteratorActual(); |
| |
| if (cancelSignal.get()) { |
| queryIterator.cancel(); |
| |
| // Fail now if cancelled already |
| throw new QueryCancelledException(); |
| } |
| } |
| } |
| |
| /** Start the query iterator, setting timeouts as needed. */ |
| private void startQueryIteratorActual() { |
| if ( queryIterator != null ) { |
| Log.warn(this, "Query iterator has already been started"); |
| return; |
| } |
| |
| execInit(); |
| |
| /* Timeouts: |
| * -1,-1 No timeouts |
| * N, same as -1,N Overall timeout only. No wrapper needed. |
| * N,-1 Timeout on first row only. Need to cancel on first row. |
| * N,M First/overall timeout. Need to reset on first row. |
| */ |
| |
| if ( !isTimeoutSet(timeout1) && !isTimeoutSet(timeout2) ) { |
| // Case -1,-1 |
| queryIterator = getPlan().iterator(); |
| return; |
| } |
| |
| // JENA-2141 - the timeout can go off while building the query iterator structure. |
| // In this case, use a signal passed through the context. |
| // We don't know if getPlan().iterator() does a lot of work or not |
| // (ideally it shouldn't start executing the query but in some sub-systems |
| // it might be necessary) |
| // |
| // This applies to the time to first result because to get the first result, the |
| // queryIterator must have been built. So it does not apply for the second |
| // stage of N,-1 or N,M. |
| context.set(ARQConstants.symCancelQuery, cancelSignal); |
| TimeoutCallback callback = new TimeoutCallback() ; |
| expectedCallback.set(callback) ; |
| |
| if ( !isTimeoutSet(timeout1) && isTimeoutSet(timeout2) ) { |
| // Case -1,N |
| timeout2Alarm = alarmClock.add(callback, timeout2) ; |
| // Start the query. |
| queryIterator = getPlan().iterator(); |
| // But don't add resetter. |
| return ; |
| } |
| |
| // Case N,-1 |
| // Case N,M |
| // Case isTimeoutSet(timeout1) |
| // Whether timeout2 is set is determined by QueryIteratorTimer2 |
| // Subcase 2: ! isTimeoutSet(timeout2) |
| // Add timeout to first row. |
| |
| timeout1Alarm = alarmClock.add(callback, timeout1) ; |
| |
| queryIterator = getPlan().iterator(); |
| // Add the timeout1->timeout2 resetter wrapper. |
| queryIterator = new QueryIteratorTimer2(queryIterator); |
| } |
| |
| private Plan getPlan() { |
| if ( plan == null ) { |
| Binding initial = ( initialBinding != null ) ? initialBinding : BindingFactory.root(); |
| plan = qeFactory.create(query, dataset, initial, getContext()); |
| } |
| return plan; |
| } |
| |
| private void insertPrefixesInto(Graph graph) { |
| try { |
| if ( dataset != null ) { |
| // Load the models prefixes first |
| PrefixMap m = dataset.prefixes(); |
| m.forEach((prefix, uri) -> graph.getPrefixMapping().setNsPrefix(prefix, uri)); |
| } |
| // Then add the queries (just the declared mappings) |
| // so the query declarations override the data sources. |
| graph.getPrefixMapping().setNsPrefixes(query.getPrefixMapping()); |
| } catch (Exception ex) { |
| Log.warn(this, "Exception in insertPrefixes: " + ex.getMessage(), ex); |
| } |
| } |
| |
| private void insertPrefixesInto(DatasetGraph dsg) { |
| try { |
| PrefixMap pmap = dsg.prefixes(); |
| |
| if ( dataset != null ) { |
| // Load the models prefixes first |
| pmap.putAll(dataset.prefixes()); |
| } |
| // Then add the queries (just the declared mappings) |
| // so the query declarations override the data sources. |
| query.getPrefixMapping().getNsPrefixMap().forEach((prefix, uri) -> pmap.add(prefix, uri)); |
| } catch (Exception ex) { |
| Log.warn(this, "Exception in insertPrefixes: " + ex.getMessage(), ex); |
| } |
| |
| } |
| |
| static private String labelForQuery(Query q) { |
| if ( q.isSelectType() ) return "SELECT"; |
| if ( q.isConstructType() ) return "CONSTRUCT"; |
| if ( q.isDescribeType() ) return "DESCRIBE"; |
| if ( q.isAskType() ) return "ASK"; |
| if ( q.isJsonType() ) return "JSON"; |
| return "<<unknown>>"; |
| } |
| |
| @Override |
| public Context getContext() { return context; } |
| |
| @Override |
| public DatasetGraph getDataset() { return dataset; } |
| |
| @Override |
| public Query getQuery() { return query; } |
| |
| @Override |
| public String getQueryString() { |
| if ( queryString == null ) |
| queryString = query.toString(); |
| return queryString; |
| } |
| |
| } |