Merge pull request #646 from strangepleasures/JENA-1785
JENA-1785: A newly created node can remain invisible after commit
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/TDB2StorageBuilder.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/TDB2StorageBuilder.java
index c3f2bfb..18b76cd 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/TDB2StorageBuilder.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/TDB2StorageBuilder.java
@@ -358,49 +358,7 @@
// components.add(tc);
// [1746]
-
- TransactionListener listener = new TransactionListener() {
- private final boolean PRT = false;
-
- @Override public void notifyTxnStart(Transaction transaction) {
- if ( PRT ) System.out.println("notifyTxnStart");
- if ( transaction.isWriteTxn() )
- nodeTableCache.updateStart();
- }
-
-
- @Override public void notifyPromoteFinish(Transaction transaction) {
- if ( transaction.isWriteTxn() )
- nodeTableCache.updateStart();
- if ( PRT ) System.out.println("notifyPromoteFinish");
- }
-
- @Override public void notifyAbortStart(Transaction transaction) {
- if ( PRT ) System.out.println("notifyAbortStart");
- if ( transaction.isWriteTxn() ) {
- //System.out.println(" "+transaction.getTxnId());
- //System.out.println(" "+transaction.getMode());
- nodeTableCache.updateAbort();
- }
- }
- //@Override public void notifyAbortFinish(Transaction transaction) {}
-
- /** Start prepare during a commit */
- @Override
- public void notifyPrepareStart(Transaction transaction) {
- if ( PRT ) System.out.println("notifyPrepareStart");
- }
-
- @Override public void notifyCommitFinish(Transaction transaction) {
- // Tell the NodeTableCache to update the main caches.
- // This must be after the underlying NodeTableNative has committed.
- if ( transaction.isWriteTxn() ) {
- // This is before "prepare" is called on components.
- nodeTableCache.updateCommit();
- }
- }
- };
- listeners.add(listener);
+ listeners.add(nodeTableCache);
}
nodeTable = NodeTableInline.create(nodeTable);
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
index 005fecc..0d3d7fd 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
@@ -27,6 +27,8 @@
import org.apache.jena.atlas.lib.CacheFactory;
import org.apache.jena.atlas.lib.Pair;
import org.apache.jena.atlas.logging.Log;
+import org.apache.jena.dboe.transaction.txn.Transaction;
+import org.apache.jena.dboe.transaction.txn.TransactionListener;
import org.apache.jena.graph.Node;
import org.apache.jena.tdb2.TDBException;
import org.apache.jena.tdb2.params.StoreParams;
@@ -37,12 +39,12 @@
* wrapper. Read-cache - write caching is done via the object file used by the
* base NodeTable.
*/
-public class NodeTableCache implements NodeTable {
+public class NodeTableCache implements NodeTable, TransactionListener {
// These caches are updated together.
// See synchronization in _retrieveNodeByNodeId and _idForNode
// The cache is assumed to be single operation-thread-safe.
- // The buffering is for updates. Only the updating thread will see changes due to new nodes
+ // The buffering is for updates. Only the updating thread will see changes due to new nodes
// Case 1: Not in main "not-present"
// Add to local "not-present", flush down.
@@ -51,17 +53,18 @@
// Write back updates "not-present"
// Depends on "not-rpesent" used to protect the underlying "not-present"
-
-
+
+
private ThreadBufferingCache<Node, NodeId> node2id_Cache = null;
private ThreadBufferingCache<NodeId, Node> id2node_Cache = null;
// A small cache of "known unknowns" to speed up searching for impossible things.
// Cache update needed on NodeTable changes because a node may become "known"
- private ThreadBufferingCache<Node, Object> notPresent = null;
+ private Cache<Node, Object> notPresent = null;
private NodeTable baseTable;
private final Object lock = new Object();
-
+ private volatile Thread writingThread;
+
public static NodeTable create(NodeTable nodeTable, StoreParams params) {
int nodeToIdCacheSize = params.getNode2NodeIdCacheSize();
int idToNodeCacheSize = params.getNodeId2NodeCacheSize();
@@ -83,14 +86,14 @@
if ( idToNodeCacheSize > 0 )
id2node_Cache = createCache("idToNode", idToNodeCacheSize, 1000);
if ( nodeMissesCacheSize > 0 )
- notPresent = createCache("notPresent", nodeMissesCacheSize, 100);
+ notPresent = CacheFactory.createCache(nodeMissesCacheSize);
}
private static <Key, Value> ThreadBufferingCache<Key, Value> createCache(String label, int mainCachesize, int bufferSize) {
Cache<Key, Value> cache = CacheFactory.createCache(mainCachesize);
return new ThreadBufferingCache<>(label, cache, bufferSize);
}
-
+
// ---- Cache access, no going to underlying table.
public Node getNodeForNodeIdCache(NodeId id) {
@@ -220,7 +223,7 @@
nodeId = baseTable.getAllocateNodeId(node);
else {
if ( notPresent(node) )
- // Known not be in the baseTable.
+ // Known not be in the baseTable.
return NodeId.NodeDoesNotExist;
else
nodeId = baseTable.getNodeIdForNode(node);
@@ -234,7 +237,7 @@
// ----------------
// ---- Only places that the caches are touched
- /**
+ /**
* Test whether in the "not present" cache.
* True means "known to be absent from the baseTable".
*/
@@ -262,14 +265,6 @@
// Remember things known (currently) not to exist.
// Does not matter if notPresent is being updated elsewhere.
return node2id_Cache.getIfPresent(node);
-
- // XXX [467]
- // Pre JENA-1467 - can be deleted.
-// if ( notPresent != null && notPresent.containsKey(node) )
-// return null;
-// if ( node2id_Cache == null )
-// return null;
-// return node2id_Cache.getIfPresent(node);
}
/** Update the Node<->NodeId caches */
@@ -281,8 +276,9 @@
// The "notPresent" cache is used to note whether a node
// is known not to exist in the baseTable..
// This must be specially handled later if the node is added.
+ // Only top-level transactions can add nodes to the "notPresent" cache.
if ( NodeId.isDoesNotExist(id) ) {
- if ( notPresent != null )
+ if ( notPresent != null && inTopLevelTxn())
notPresent.put(node, Boolean.TRUE);
return;
}
@@ -301,40 +297,70 @@
notPresent.remove(node);
}
+ // A top-level transaction is either
+ // - a write transaction or
+ // - a read transaction with most recent data version given that there's no active write transaction.
+ private boolean inTopLevelTxn() {
+ Thread writer = writingThread;
+ return (writer == null) || (writer == Thread.currentThread());
+ }
+
+ @Override
+ public void notifyTxnStart(Transaction transaction) {
+ if (transaction.isWriteTxn())
+ updateStart();
+ }
+
+ @Override
+ public void notifyPromoteFinish(Transaction transaction) {
+ if(transaction.isWriteTxn())
+ updateStart();
+ }
+
+ @Override
+ public void notifyCompleteFinish(Transaction transaction) {
+ if(transaction.isWriteTxn()) {
+ updateCommit();
+ }
+ }
+
+ @Override
+ public void notifyAbortStart(Transaction transaction) {
+ if(transaction.isWriteTxn())
+ updateAbort();
+ }
+
// ----
// The cache is "optimistic" - nodes are added during the transaction.
- // It does not matter if they get added (and visible earlier)
+ // It does not matter if they get added (and visible earlier)
// because this is nothing more than "preallocation". Triples (Tuple of NodeIds) don't match.
-
+
// Underlying file has them "transactionally".
-
+
// On abort, it does need to be undone because the underlying NodeTable
// being cached will not have them.
-
- public void updateStart() {
- //System.out.println("updateStart: "+baseTable.toString());
+
+ private void updateStart() {
node2id_Cache.enableBuffering();
id2node_Cache.enableBuffering();
- notPresent.enableBuffering();
+ writingThread = Thread.currentThread();
}
-
- public void updateAbort() {
- //System.out.println("updateAbort: "+baseTable.toString());
+
+ private void updateAbort() {
+ writingThread = null;
+
node2id_Cache.dropBuffer();
id2node_Cache.dropBuffer();
- notPresent.dropBuffer();
}
-
- public void updateCommit() {
- //System.out.println("updateCommit: "+baseTable.toString());
- // Already in the baseTable.
+
+ private void updateCommit() {
+ writingThread = null;
// Write to main caches.
node2id_Cache.flushBuffer();
id2node_Cache.flushBuffer();
- notPresent.flushBuffer();
}
-
+
@Override
public boolean isEmpty() {
synchronized (lock) {
diff --git a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestVisibilityOfChanges.java b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestVisibilityOfChanges.java
new file mode 100644
index 0000000..0ac25c0
--- /dev/null
+++ b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestVisibilityOfChanges.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.store;
+
+import org.apache.jena.dboe.base.file.Location;
+import org.apache.jena.query.Dataset;
+import org.apache.jena.rdf.model.Resource;
+import org.apache.jena.rdf.model.ResourceFactory;
+import org.apache.jena.system.Txn;
+import org.apache.jena.tdb2.TDB2Factory;
+import org.apache.jena.vocabulary.RDFS;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestVisibilityOfChanges {
+ private Dataset dataset;
+
+ @Before
+ public void before() {
+ dataset = TDB2Factory.connectDataset(Location.mem());
+ }
+
+ @After
+ public void after() {
+ dataset.close();
+ }
+
+ @Test
+ public void visibilityOfNewlyCreatedResources() {
+ Resource resource = ResourceFactory.createResource();
+ assertFalse(Txn.calculateRead(dataset, () -> dataset.getDefaultModel().containsResource(resource)));
+ Txn.executeWrite(dataset, () -> {
+ dataset.getDefaultModel().add(resource, RDFS.label, "I exist!");
+ assertTrue(dataset.getDefaultModel().containsResource(resource));
+ rinseCache();
+ assertTrue("A newly created resource should be visible within the write transaction",
+ dataset.getDefaultModel().containsResource(resource));
+ });
+ assertTrue("A newly created resource should be visible after commit",
+ Txn.calculateRead(dataset, () -> dataset.getDefaultModel().containsResource(resource)));
+ }
+
+ @Test
+ public void testIsolation() {
+ Resource resource = ResourceFactory.createResource();
+ Txn.executeWrite(dataset, () -> {
+ assertFalse(dataset.getDefaultModel().containsResource(resource));
+ dataset.getDefaultModel().add(resource, RDFS.label, "I exist!");
+ rinseCache();
+ executeAndWait(() -> Txn.executeRead(dataset, () -> dataset.getDefaultModel().containsResource(resource)));
+ assertTrue("A read transaction should not pollute the nonPresent cache if there's a write transaction",
+ dataset.getDefaultModel().containsResource(resource));
+ });
+ }
+
+
+ private void rinseCache() {
+ int cacheSize = 2000;
+ for (int i = 0; i < cacheSize; i++) {
+ Resource newResource = ResourceFactory.createResource();
+ dataset.getDefaultModel().add(newResource, RDFS.label, "");
+ }
+ }
+
+ private void executeAndWait(Runnable runnable) {
+ Thread thread = new Thread(runnable);
+ thread.start();
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ fail();
+ }
+ }
+}
\ No newline at end of file