Merge pull request #646 from strangepleasures/JENA-1785

JENA-1785: A newly created node can remain invisible after commit
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/TDB2StorageBuilder.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/TDB2StorageBuilder.java
index c3f2bfb..18b76cd 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/TDB2StorageBuilder.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/TDB2StorageBuilder.java
@@ -358,49 +358,7 @@
 //            components.add(tc);
 
             // [1746]
-            
-            TransactionListener listener = new TransactionListener() {
-                private final boolean PRT = false;
-                
-                @Override public void notifyTxnStart(Transaction transaction) { 
-                    if ( PRT ) System.out.println("notifyTxnStart");
-                    if ( transaction.isWriteTxn() )
-                        nodeTableCache.updateStart();
-                }   
-                
-                
-                @Override public void notifyPromoteFinish(Transaction transaction) {
-                    if ( transaction.isWriteTxn() )
-                        nodeTableCache.updateStart();
-                    if ( PRT ) System.out.println("notifyPromoteFinish");
-                }
-                
-                @Override public void notifyAbortStart(Transaction transaction) {
-                    if ( PRT ) System.out.println("notifyAbortStart");
-                    if ( transaction.isWriteTxn() ) {
-                        //System.out.println("    "+transaction.getTxnId());
-                        //System.out.println("    "+transaction.getMode());
-                        nodeTableCache.updateAbort();
-                    }
-                }
-                //@Override public void notifyAbortFinish(Transaction transaction) {} 
-
-                /** Start prepare during a commit */
-                @Override
-                public void notifyPrepareStart(Transaction transaction) {
-                    if ( PRT ) System.out.println("notifyPrepareStart");
-                }
-                
-                @Override public void notifyCommitFinish(Transaction transaction) { 
-                    // Tell the NodeTableCache to update the main caches. 
-                    // This must be after the underlying NodeTableNative has committed. 
-                    if ( transaction.isWriteTxn() ) {
-                        // This is before "prepare" is called on components. 
-                        nodeTableCache.updateCommit();
-                    }
-                }
-            };
-            listeners.add(listener);
+            listeners.add(nodeTableCache);
         }
         
         nodeTable = NodeTableInline.create(nodeTable);
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
index 005fecc..0d3d7fd 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
@@ -27,6 +27,8 @@
 import org.apache.jena.atlas.lib.CacheFactory;
 import org.apache.jena.atlas.lib.Pair;
 import org.apache.jena.atlas.logging.Log;
+import org.apache.jena.dboe.transaction.txn.Transaction;
+import org.apache.jena.dboe.transaction.txn.TransactionListener;
 import org.apache.jena.graph.Node;
 import org.apache.jena.tdb2.TDBException;
 import org.apache.jena.tdb2.params.StoreParams;
@@ -37,12 +39,12 @@
  * wrapper. Read-cache - write caching is done via the object file used by the
  * base NodeTable.
  */
-public class NodeTableCache implements NodeTable {
+public class NodeTableCache implements NodeTable, TransactionListener {
     // These caches are updated together.
     // See synchronization in _retrieveNodeByNodeId and _idForNode
     // The cache is assumed to be single operation-thread-safe.
 
-    // The buffering is for updates. Only the updating thread will see changes due to new nodes   
+    // The buffering is for updates. Only the updating thread will see changes due to new nodes
     // Case 1: Not in main "not-present"
     //  Add to local "not-present", flush down.
 
@@ -51,17 +53,18 @@
     //   Write back updates "not-present"
     //   Depends on "not-rpesent" used to protect the underlying "not-present"
 
-    
-    
+
+
     private ThreadBufferingCache<Node, NodeId> node2id_Cache = null;
     private ThreadBufferingCache<NodeId, Node> id2node_Cache = null;
 
     // A small cache of "known unknowns" to speed up searching for impossible things.
     // Cache update needed on NodeTable changes because a node may become "known"
-    private ThreadBufferingCache<Node, Object> notPresent    = null;
+    private Cache<Node, Object> notPresent    = null;
     private NodeTable           baseTable;
     private final Object        lock          = new Object();
-    
+    private volatile Thread     writingThread;
+
     public static NodeTable create(NodeTable nodeTable, StoreParams params) {
         int nodeToIdCacheSize = params.getNode2NodeIdCacheSize();
         int idToNodeCacheSize = params.getNodeId2NodeCacheSize();
@@ -83,14 +86,14 @@
         if ( idToNodeCacheSize > 0 )
             id2node_Cache = createCache("idToNode", idToNodeCacheSize, 1000);
         if ( nodeMissesCacheSize > 0 )
-            notPresent = createCache("notPresent", nodeMissesCacheSize, 100);
+            notPresent = CacheFactory.createCache(nodeMissesCacheSize);
     }
 
     private static <Key, Value> ThreadBufferingCache<Key, Value> createCache(String label, int mainCachesize, int bufferSize) {
         Cache<Key, Value> cache = CacheFactory.createCache(mainCachesize);
         return new ThreadBufferingCache<>(label, cache, bufferSize);
     }
-    
+
     // ---- Cache access, no going to underlying table.
 
     public Node getNodeForNodeIdCache(NodeId id) {
@@ -220,7 +223,7 @@
                 nodeId = baseTable.getAllocateNodeId(node);
             else {
                 if ( notPresent(node) )
-                    // Known not be in the baseTable. 
+                    // Known not be in the baseTable.
                     return NodeId.NodeDoesNotExist;
                 else
                     nodeId = baseTable.getNodeIdForNode(node);
@@ -234,7 +237,7 @@
     // ----------------
     // ---- Only places that the caches are touched
 
-    /** 
+    /**
      * Test whether in the "not present" cache.
      * True means "known to be absent from the baseTable".
      */
@@ -262,14 +265,6 @@
         // Remember things known (currently) not to exist.
         // Does not matter if notPresent is being updated elsewhere.
         return node2id_Cache.getIfPresent(node);
-
-        // XXX [467]
-        // Pre JENA-1467 - can be deleted.
-//        if ( notPresent != null && notPresent.containsKey(node) )
-//            return null;
-//        if ( node2id_Cache == null )
-//            return null;
-//        return node2id_Cache.getIfPresent(node);
     }
 
     /** Update the Node&lt;-&gt;NodeId caches */
@@ -281,8 +276,9 @@
         // The "notPresent" cache is used to note whether a node
         // is known not to exist in the baseTable..
         // This must be specially handled later if the node is added.
+        // Only top-level transactions can add nodes to the "notPresent" cache.
         if ( NodeId.isDoesNotExist(id) ) {
-            if ( notPresent != null )
+            if ( notPresent != null && inTopLevelTxn())
                 notPresent.put(node, Boolean.TRUE);
             return;
         }
@@ -301,40 +297,70 @@
             notPresent.remove(node);
     }
 
+    // A top-level transaction is either
+    // - a write transaction or
+    // - a read transaction with most recent data version given that there's no active write transaction.
+    private boolean inTopLevelTxn() {
+        Thread writer = writingThread;
+        return (writer == null) || (writer == Thread.currentThread());
+    }
+
+    @Override
+    public void notifyTxnStart(Transaction transaction) {
+        if (transaction.isWriteTxn())
+            updateStart();
+    }
+
+    @Override
+    public void notifyPromoteFinish(Transaction transaction) {
+        if(transaction.isWriteTxn())
+            updateStart();
+    }
+
+    @Override
+    public void notifyCompleteFinish(Transaction transaction) {
+        if(transaction.isWriteTxn()) {
+            updateCommit();
+        }
+    }
+
+    @Override
+    public void notifyAbortStart(Transaction transaction) {
+        if(transaction.isWriteTxn())
+            updateAbort();
+    }
+
     // ----
 
     // The cache is "optimistic" - nodes are added during the transaction.
-    // It does not matter if they get added (and visible earlier) 
+    // It does not matter if they get added (and visible earlier)
     // because this is nothing more than "preallocation". Triples (Tuple of NodeIds) don't match.
-    
+
     // Underlying file has them "transactionally".
-    
+
     // On abort, it does need to be undone because the underlying NodeTable
     // being cached will not have them.
-    
-    public void updateStart() {
-        //System.out.println("updateStart: "+baseTable.toString());
+
+    private void updateStart() {
         node2id_Cache.enableBuffering();
         id2node_Cache.enableBuffering();
-        notPresent.enableBuffering();
+        writingThread = Thread.currentThread();
     }
-    
-    public void updateAbort() {
-        //System.out.println("updateAbort: "+baseTable.toString());
+
+    private void updateAbort() {
+        writingThread = null;
+
         node2id_Cache.dropBuffer();
         id2node_Cache.dropBuffer();
-        notPresent.dropBuffer();
     }
-    
-    public void updateCommit() {
-        //System.out.println("updateCommit: "+baseTable.toString());
-        // Already in the baseTable.
+
+    private void updateCommit() {
+        writingThread = null;
         // Write to main caches.
         node2id_Cache.flushBuffer();
         id2node_Cache.flushBuffer();
-        notPresent.flushBuffer();
     }
-    
+
     @Override
     public boolean isEmpty() {
         synchronized (lock) {
diff --git a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestVisibilityOfChanges.java b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestVisibilityOfChanges.java
new file mode 100644
index 0000000..0ac25c0
--- /dev/null
+++ b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestVisibilityOfChanges.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.store;
+
+import org.apache.jena.dboe.base.file.Location;
+import org.apache.jena.query.Dataset;
+import org.apache.jena.rdf.model.Resource;
+import org.apache.jena.rdf.model.ResourceFactory;
+import org.apache.jena.system.Txn;
+import org.apache.jena.tdb2.TDB2Factory;
+import org.apache.jena.vocabulary.RDFS;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestVisibilityOfChanges {
+    private Dataset dataset;
+
+    @Before
+    public void before() {
+        dataset = TDB2Factory.connectDataset(Location.mem());
+    }
+
+    @After
+    public void after() {
+        dataset.close();
+    }
+
+    @Test
+    public void visibilityOfNewlyCreatedResources() {
+        Resource resource = ResourceFactory.createResource();
+        assertFalse(Txn.calculateRead(dataset, () -> dataset.getDefaultModel().containsResource(resource)));
+        Txn.executeWrite(dataset, () -> {
+            dataset.getDefaultModel().add(resource, RDFS.label, "I exist!");
+            assertTrue(dataset.getDefaultModel().containsResource(resource));
+            rinseCache();
+            assertTrue("A newly created resource should be visible within the write transaction",
+                    dataset.getDefaultModel().containsResource(resource));
+        });
+        assertTrue("A newly created resource should be visible after commit",
+                Txn.calculateRead(dataset, () -> dataset.getDefaultModel().containsResource(resource)));
+    }
+
+    @Test
+    public void testIsolation() {
+        Resource resource = ResourceFactory.createResource();
+        Txn.executeWrite(dataset, () -> {
+            assertFalse(dataset.getDefaultModel().containsResource(resource));
+            dataset.getDefaultModel().add(resource, RDFS.label, "I exist!");
+            rinseCache();
+            executeAndWait(() -> Txn.executeRead(dataset, () -> dataset.getDefaultModel().containsResource(resource)));
+            assertTrue("A read transaction should not pollute the nonPresent cache if there's a write transaction",
+                    dataset.getDefaultModel().containsResource(resource));
+        });
+    }
+
+
+    private void rinseCache() {
+        int cacheSize = 2000;
+        for (int i = 0; i < cacheSize; i++) {
+            Resource newResource = ResourceFactory.createResource();
+            dataset.getDefaultModel().add(newResource, RDFS.label, "");
+        }
+    }
+
+    private void executeAndWait(Runnable runnable) {
+        Thread thread = new Thread(runnable);
+        thread.start();
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            fail();
+        }
+    }
+}
\ No newline at end of file