| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.jena.sparql.core.mem; |
| |
| import static java.lang.ThreadLocal.withInitial; |
| import static org.apache.jena.graph.Node.ANY; |
| import static org.apache.jena.query.ReadWrite.WRITE; |
| import static org.apache.jena.sparql.core.Quad.isUnionGraph; |
| import static org.apache.jena.sparql.util.graph.GraphUtils.triples2quadsDftGraph ; |
| import static org.apache.jena.system.Txn.calculateRead; |
| import static org.apache.jena.system.Txn.executeWrite; |
| import static org.slf4j.LoggerFactory.getLogger; |
| |
| import java.util.Iterator; |
| import java.util.concurrent.atomic.AtomicLong ; |
| import java.util.concurrent.locks.ReentrantLock ; |
| import java.util.function.Consumer; |
| import java.util.function.Supplier; |
| |
| import org.apache.jena.atlas.lib.InternalErrorException ; |
| import org.apache.jena.graph.*; |
| import org.apache.jena.query.ReadWrite; |
| import org.apache.jena.query.TxnType; |
| import org.apache.jena.shared.Lock; |
| import org.apache.jena.shared.LockMRPlusSW; |
| import org.apache.jena.sparql.JenaTransactionException; |
| import org.apache.jena.sparql.core.* ; |
| import org.slf4j.Logger; |
| |
| /** |
| * A {@link DatasetGraph} backed by an {@link QuadTable}. By default, this is a |
| * {@link HexTable} designed for high-speed in-memory operation. |
| */ |
| public class DatasetGraphInMemory extends DatasetGraphTriplesQuads implements Transactional { |
| |
| private static final Logger log = getLogger(DatasetGraphInMemory.class); |
| |
| private final DatasetPrefixStorage prefixes = new DatasetPrefixStorageInMemory(); |
| |
| /** This lock imposes the multiple-reader and single-writer policy of transactions */ |
| private final Lock transactionLock = new LockMRPlusSW(); |
| |
| /** |
| * Transaction lifecycle operations must be atomic, especially |
| * {@link Transactional#begin} and {@link Transactional#commit}. |
| * <p> |
| * There are changes to be made to several datastructures and this |
| * insures that they are made consistently. |
| */ |
| private final ReentrantLock systemLock = new ReentrantLock(true); |
| |
| /** |
| * Dataset version. |
| * A write transaction increments this in commit. |
| */ |
| private final AtomicLong generation = new AtomicLong(0) ; |
| private final ThreadLocal<Long> version = withInitial(() -> 0L); |
| |
| private final ThreadLocal<Boolean> isInTransaction = withInitial(() -> false); |
| |
| @Override |
| public boolean isInTransaction() { |
| return isInTransaction.get(); |
| } |
| |
| protected void isInTransaction(final boolean b) { |
| isInTransaction.set(b); |
| } |
| |
| private final ThreadLocal<TxnType> transactionType = withInitial(() -> null); |
| // Current state. |
| private final ThreadLocal<ReadWrite> transactionMode = withInitial(() -> null); |
| |
| /** |
| * @return the current mode of the transaction in progress |
| */ |
| @Override |
| public ReadWrite transactionMode() { |
| return transactionMode.get(); |
| } |
| |
| @Override |
| public TxnType transactionType() { |
| return transactionType.get(); |
| } |
| |
| private void transactionMode(final ReadWrite readWrite) { |
| transactionMode.set(readWrite); |
| } |
| |
| private final QuadTable quadsIndex; |
| |
| private QuadTable quadsIndex() { |
| return quadsIndex; |
| } |
| |
| private final TripleTable defaultGraph; |
| |
| private TripleTable defaultGraph() { |
| return defaultGraph; |
| } |
| |
| /** |
| * Default constructor. |
| */ |
| public DatasetGraphInMemory() { |
| this(new HexTable(), new TriTable()); |
| } |
| |
| /** |
| * @param i a table in which to store quads |
| * @param t a table in which to store triples |
| */ |
| public DatasetGraphInMemory(final QuadTable i, final TripleTable t) { |
| this.quadsIndex = i; |
| this.defaultGraph = t; |
| } |
| |
| @Override |
| public boolean supportsTransactions() { return true; } |
| @Override |
| public boolean supportsTransactionAbort() { return true; } |
| |
| @Override |
| public void begin(final ReadWrite readWrite) { |
| begin(TxnType.convert(readWrite)); |
| } |
| |
| @Override |
| public void begin(TxnType txnType) { |
| if (isInTransaction()) |
| throw new JenaTransactionException("Transactions cannot be nested!"); |
| transactionType.set(txnType); |
| _begin(txnType, TxnType.initial(txnType)); |
| } |
| |
| private void _begin(TxnType txnType, ReadWrite readWrite) { |
| // Takes transactionLock |
| startTransaction(txnType, readWrite); |
| withLock(systemLock, () ->{ |
| quadsIndex().begin(readWrite); |
| defaultGraph().begin(readWrite); |
| version.set(generation.get()); |
| }) ; |
| } |
| |
| /** Called transaction start code at most once per transaction. */ |
| private void startTransaction(TxnType txnType, ReadWrite mode) { |
| transactionLock.enterCriticalSection(mode.equals(ReadWrite.READ)); // get the dataset write lock, if needed. |
| transactionType.set(txnType); |
| transactionMode(mode); |
| isInTransaction(true); |
| } |
| |
| /** Called transaction ending code at most once per transaction. */ |
| private void finishTransaction() { |
| isInTransaction.remove(); |
| transactionType.remove(); |
| transactionMode.remove(); |
| version.remove(); |
| transactionLock.leaveCriticalSection(); |
| } |
| |
| @Override |
| public boolean promote(Promote promoteMode) { |
| if (!isInTransaction()) |
| throw new JenaTransactionException("Tried to promote outside a transaction!"); |
| if ( transactionMode().equals(ReadWrite.WRITE) ) |
| return true; |
| |
| if ( transactionType() == TxnType.READ ) |
| return false; |
| |
| boolean readCommitted = (promoteMode == Promote.READ_COMMITTED); |
| |
| try { |
| _promote(readCommitted); |
| return true; |
| } catch (JenaTransactionException ex) { |
| return false ; |
| } |
| } |
| |
| private void _promote(boolean readCommited) { |
| // Outside lock. |
| if ( ! readCommited && version.get() != generation.get() ) { |
| // This tests for any commited writers since this transaction started. |
| // This does not catch the case of a currently active writer |
| // that has not gone to commit or abort yet. |
| // The final test is after we obtain the transactionLock. |
| throw new JenaTransactionException("Dataset changed - can't promote") ; |
| } |
| |
| // Blocking on other writers. |
| transactionLock.enterCriticalSection(Lock.WRITE); |
| // Check again now we are inside the lock. |
| if ( ! readCommited && version.get() != generation.get() ) { |
| // Can't promote - release the lock. |
| transactionLock.leaveCriticalSection(); |
| throw new JenaTransactionException("Concurrent writer changed the dataset : can't promote") ; |
| } |
| // We have the lock and we have promoted! |
| transactionMode(WRITE); |
| _begin(transactionType(), ReadWrite.WRITE) ; |
| } |
| |
| @Override |
| public void commit() { |
| if (!isInTransaction()) |
| throw new JenaTransactionException("Tried to commit outside a transaction!"); |
| if (transactionMode().equals(WRITE)) |
| _commit(); |
| finishTransaction(); |
| } |
| |
| private void _commit() { |
| withLock(systemLock, () -> { |
| quadsIndex().commit(); |
| defaultGraph().commit(); |
| quadsIndex().end(); |
| defaultGraph().end(); |
| |
| if ( transactionMode().equals(WRITE) ) { |
| if ( version.get() != generation.get() ) |
| throw new InternalErrorException(String.format("Version=%d, Generation=%d",version.get(),generation.get())) ; |
| generation.incrementAndGet() ; |
| } |
| } ) ; |
| } |
| |
| @Override |
| public void abort() { |
| if (!isInTransaction()) |
| throw new JenaTransactionException("Tried to abort outside a transaction!"); |
| if (transactionMode().equals(WRITE)) |
| _abort(); |
| finishTransaction(); |
| } |
| |
| private void _abort() { |
| withLock(systemLock, () -> { |
| quadsIndex().abort(); |
| defaultGraph().abort(); |
| quadsIndex().end(); |
| defaultGraph().end(); |
| } ) ; |
| } |
| |
| @Override |
| public void close() { |
| if (isInTransaction()) |
| abort(); |
| } |
| |
| @Override |
| public void end() { |
| if (isInTransaction()) { |
| if (transactionMode().equals(WRITE)) { |
| String msg = "end() called for WRITE transaction without commit or abort having been called. This causes a forced abort."; |
| // _abort does _end actions inside the lock. |
| _abort() ; |
| finishTransaction(); |
| throw new JenaTransactionException(msg); |
| } else { |
| _end() ; |
| } |
| finishTransaction(); |
| } |
| } |
| |
| private void _end() { |
| withLock(systemLock, () -> { |
| quadsIndex().end(); |
| defaultGraph().end(); |
| } ) ; |
| } |
| |
| private static void withLock(java.util.concurrent.locks.Lock lock, Runnable action) { |
| lock.lock(); |
| try { action.run(); } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| private <T> Iterator<T> access(final Supplier<Iterator<T>> source) { |
| return isInTransaction() ? source.get() : calculateRead(this, source::get); |
| } |
| |
| @Override |
| public Iterator<Node> listGraphNodes() { |
| return access(() -> quadsIndex().listGraphNodes().iterator()); |
| } |
| |
| private Iterator<Quad> quadsFinder(final Node g, final Node s, final Node p, final Node o) { |
| if (isUnionGraph(g)) return findInUnionGraph$(s, p, o); |
| return quadsIndex().find(g, s, p, o).iterator(); |
| } |
| |
| /** |
| * Union graph is the merge of named graphs. |
| */ |
| // Temp - Should this be replaced by DatasetGraphBaseFind code? |
| private Iterator<Quad> findInUnionGraph$(final Node s, final Node p, final Node o) { |
| return access(() -> quadsIndex().findInUnionGraph(s, p, o).iterator()); |
| } |
| |
| private Iterator<Quad> triplesFinder(final Node s, final Node p, final Node o) { |
| return triples2quadsDftGraph(defaultGraph().find(s, p, o).iterator()); |
| } |
| |
| @Override |
| public void setDefaultGraph(final Graph g) { |
| mutate(graph -> { |
| defaultGraph().clear(); |
| graph.find().forEachRemaining(defaultGraph()::add); |
| }, g); |
| } |
| |
| @Override |
| public Graph getGraph(final Node graphNode) { |
| return new GraphInMemory(this, graphNode); |
| } |
| |
| @Override |
| public Graph getDefaultGraph() { |
| return getGraph(Quad.defaultGraphNodeGenerated); |
| } |
| |
| @Override |
| public Graph getUnionGraph() { |
| return getGraph(Quad.unionGraph); |
| } |
| |
| private Consumer<Graph> addGraph(final Node name) { |
| return g -> g.find().mapWith(t -> new Quad(name, t)).forEachRemaining(this::add); |
| } |
| |
| private final Consumer<Graph> removeGraph = g -> g.find().forEachRemaining(g::delete); |
| |
| @Override |
| public void addGraph(final Node graphName, final Graph graph) { |
| mutate(addGraph(graphName), graph); |
| } |
| |
| @Override |
| public void removeGraph(final Node graphName) { |
| mutate(removeGraph, getGraph(graphName)); |
| prefixes().removeAllFromPrefixMap(graphName.getURI()) ; |
| } |
| |
| /** |
| * Wrap a mutation in a WRITE transaction iff necessary. |
| * |
| * @param mutator |
| * @param payload |
| */ |
| private <T> void mutate(final Consumer<T> mutator, final T payload) { |
| if (isInTransaction()) { |
| if (!transactionMode().equals(WRITE)) { |
| TxnType mode = transactionType.get(); |
| switch (mode) { |
| case WRITE: |
| break; |
| case READ: |
| throw new JenaTransactionException("Tried to write inside a READ transaction!"); |
| case READ_COMMITTED_PROMOTE: |
| case READ_PROMOTE: |
| boolean readCommitted = (mode == TxnType.READ_COMMITTED_PROMOTE); |
| _promote(readCommitted); |
| break; |
| } |
| } |
| mutator.accept(payload); |
| } else executeWrite(this, () -> mutator.accept(payload)); |
| } |
| |
| /** |
| * @return the prefixes in use in this dataset |
| */ |
| public DatasetPrefixStorage prefixes() { |
| return prefixes; |
| } |
| |
| @Override |
| public long size() { |
| return quadsIndex().listGraphNodes().count() ; |
| } |
| |
| @Override |
| public void clear() { |
| mutate(x -> { |
| defaultGraph().clear(); |
| quadsIndex().clear(); |
| } , null); |
| } |
| |
| @Override |
| protected void addToDftGraph(final Node s, final Node p, final Node o) { |
| mutate(defaultGraph()::add, Triple.create(s, p, o)); |
| } |
| |
| @Override |
| protected void addToNamedGraph(final Node g, final Node s, final Node p, final Node o) { |
| mutate(quadsIndex()::add, Quad.create(g, s, p, o)); |
| } |
| |
| @Override |
| protected void deleteFromDftGraph(final Node s, final Node p, final Node o) { |
| mutate(defaultGraph()::delete, Triple.create(s, p, o)); |
| } |
| |
| @Override |
| protected void deleteFromNamedGraph(final Node g, final Node s, final Node p, final Node o) { |
| mutate(quadsIndex()::delete, Quad.create(g, s, p, o)); |
| } |
| |
| @Override |
| protected Iterator<Quad> findInDftGraph(final Node s, final Node p, final Node o) { |
| return access(() -> triplesFinder(s, p, o)); |
| } |
| |
| @Override |
| protected Iterator<Quad> findInSpecificNamedGraph(final Node g, final Node s, final Node p, final Node o) { |
| return access(() -> quadsFinder(g, s, p, o)); |
| } |
| |
| @Override |
| protected Iterator<Quad> findInAnyNamedGraphs(final Node s, final Node p, final Node o) { |
| return findInSpecificNamedGraph(ANY, s, p, o); |
| } |
| } |