JENA-1785: A newly created node can remain invisible after commit
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/TDB2StorageBuilder.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/TDB2StorageBuilder.java
index c3f2bfb..18b76cd 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/TDB2StorageBuilder.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/TDB2StorageBuilder.java
@@ -358,49 +358,7 @@
 //            components.add(tc);
 
             // [1746]
-            
-            TransactionListener listener = new TransactionListener() {
-                private final boolean PRT = false;
-                
-                @Override public void notifyTxnStart(Transaction transaction) { 
-                    if ( PRT ) System.out.println("notifyTxnStart");
-                    if ( transaction.isWriteTxn() )
-                        nodeTableCache.updateStart();
-                }   
-                
-                
-                @Override public void notifyPromoteFinish(Transaction transaction) {
-                    if ( transaction.isWriteTxn() )
-                        nodeTableCache.updateStart();
-                    if ( PRT ) System.out.println("notifyPromoteFinish");
-                }
-                
-                @Override public void notifyAbortStart(Transaction transaction) {
-                    if ( PRT ) System.out.println("notifyAbortStart");
-                    if ( transaction.isWriteTxn() ) {
-                        //System.out.println("    "+transaction.getTxnId());
-                        //System.out.println("    "+transaction.getMode());
-                        nodeTableCache.updateAbort();
-                    }
-                }
-                //@Override public void notifyAbortFinish(Transaction transaction) {} 
-
-                /** Start prepare during a commit */
-                @Override
-                public void notifyPrepareStart(Transaction transaction) {
-                    if ( PRT ) System.out.println("notifyPrepareStart");
-                }
-                
-                @Override public void notifyCommitFinish(Transaction transaction) { 
-                    // Tell the NodeTableCache to update the main caches. 
-                    // This must be after the underlying NodeTableNative has committed. 
-                    if ( transaction.isWriteTxn() ) {
-                        // This is before "prepare" is called on components. 
-                        nodeTableCache.updateCommit();
-                    }
-                }
-            };
-            listeners.add(listener);
+            listeners.add(nodeTableCache);
         }
         
         nodeTable = NodeTableInline.create(nodeTable);
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
index 005fecc..e847341 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
@@ -27,6 +27,8 @@
 import org.apache.jena.atlas.lib.CacheFactory;
 import org.apache.jena.atlas.lib.Pair;
 import org.apache.jena.atlas.logging.Log;
+import org.apache.jena.dboe.transaction.txn.Transaction;
+import org.apache.jena.dboe.transaction.txn.TransactionListener;
 import org.apache.jena.graph.Node;
 import org.apache.jena.tdb2.TDBException;
 import org.apache.jena.tdb2.params.StoreParams;
@@ -37,7 +39,7 @@
  * wrapper. Read-cache - write caching is done via the object file used by the
  * base NodeTable.
  */
-public class NodeTableCache implements NodeTable {
+public class NodeTableCache implements NodeTable, TransactionListener {
     // These caches are updated together.
     // See synchronization in _retrieveNodeByNodeId and _idForNode
     // The cache is assumed to be single operation-thread-safe.
@@ -58,9 +60,12 @@
 
     // A small cache of "known unknowns" to speed up searching for impossible things.
     // Cache update needed on NodeTable changes because a node may become "known"
-    private ThreadBufferingCache<Node, Object> notPresent    = null;
+    private Cache<Node, Object> notPresent    = null;
     private NodeTable           baseTable;
     private final Object        lock          = new Object();
+    private final ThreadLocal<Transaction> txn = new ThreadLocal<>();
+    private long maxDataVersion;
+    private boolean hasActiveWriteTransaction;
     
     public static NodeTable create(NodeTable nodeTable, StoreParams params) {
         int nodeToIdCacheSize = params.getNode2NodeIdCacheSize();
@@ -83,7 +88,7 @@
         if ( idToNodeCacheSize > 0 )
             id2node_Cache = createCache("idToNode", idToNodeCacheSize, 1000);
         if ( nodeMissesCacheSize > 0 )
-            notPresent = createCache("notPresent", nodeMissesCacheSize, 100);
+            notPresent = CacheFactory.createCache(nodeMissesCacheSize);
     }
 
     private static <Key, Value> ThreadBufferingCache<Key, Value> createCache(String label, int mainCachesize, int bufferSize) {
@@ -281,8 +286,9 @@
         // The "notPresent" cache is used to note whether a node
         // is known not to exist in the baseTable..
         // This must be specially handled later if the node is added.
+        // Only top-level transactions can add nodes to the "notPresent" cache.
         if ( NodeId.isDoesNotExist(id) ) {
-            if ( notPresent != null )
+            if ( notPresent != null && inTopLevelTxn())
                 notPresent.put(node, Boolean.TRUE);
             return;
         }
@@ -301,6 +307,70 @@
             notPresent.remove(node);
     }
 
+    // A top-level transaction is either
+    // - a write transaction or
+    // - a read transaction with most recent data version given that there's no active write transaction.
+    private boolean inTopLevelTxn() {
+        Transaction t = txn.get();
+        return (t != null) && (t.isWriteTxn() || (!hasActiveWriteTransaction && t.getDataVersion() == maxDataVersion));
+    }
+
+    @Override
+    public void notifyTxnStart(Transaction transaction) {
+        txn.set(transaction);
+
+        synchronized (lock) {
+            maxDataVersion = Math.max(maxDataVersion, transaction.getDataVersion());
+            hasActiveWriteTransaction |= transaction.isWriteTxn();
+        }
+
+        if (transaction.isWriteTxn()) {
+            updateStart();
+        }
+
+    }
+
+    @Override
+    public void notifyPromoteFinish(Transaction transaction) {
+        if(transaction.isWriteTxn()) {
+            updateStart();
+            synchronized (lock) {
+                hasActiveWriteTransaction = true;
+            }
+        }
+    }
+
+    @Override
+    public void notifyCommitStart(Transaction transaction) {
+        if(transaction.isWriteTxn()) {
+            synchronized (lock) {
+                hasActiveWriteTransaction = false;
+            }
+        }
+    }
+
+    @Override
+    public void notifyCompleteFinish(Transaction transaction) {
+        if(transaction.isWriteTxn()) {
+            updateCommit();
+        }
+    }
+
+    @Override
+    public void notifyAbortStart(Transaction transaction) {
+        if(transaction.isWriteTxn()) {
+            synchronized (lock) {
+                hasActiveWriteTransaction = false;
+            }
+            updateAbort();
+        }
+    }
+
+    @Override
+    public void notifyTxnFinish(Transaction transaction) {
+        txn.remove();
+    }
+
     // ----
 
     // The cache is "optimistic" - nodes are added during the transaction.
@@ -312,27 +382,24 @@
     // On abort, it does need to be undone because the underlying NodeTable
     // being cached will not have them.
     
-    public void updateStart() {
+    private void updateStart() {
         //System.out.println("updateStart: "+baseTable.toString());
         node2id_Cache.enableBuffering();
         id2node_Cache.enableBuffering();
-        notPresent.enableBuffering();
     }
     
-    public void updateAbort() {
+    private void updateAbort() {
         //System.out.println("updateAbort: "+baseTable.toString());
         node2id_Cache.dropBuffer();
         id2node_Cache.dropBuffer();
-        notPresent.dropBuffer();
     }
     
-    public void updateCommit() {
+    private void updateCommit() {
         //System.out.println("updateCommit: "+baseTable.toString());
         // Already in the baseTable.
         // Write to main caches.
         node2id_Cache.flushBuffer();
         id2node_Cache.flushBuffer();
-        notPresent.flushBuffer();
     }
     
     @Override
diff --git a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestVisibilityOfChanges.java b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestVisibilityOfChanges.java
new file mode 100644
index 0000000..0ac25c0
--- /dev/null
+++ b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/store/TestVisibilityOfChanges.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.store;
+
+import org.apache.jena.dboe.base.file.Location;
+import org.apache.jena.query.Dataset;
+import org.apache.jena.rdf.model.Resource;
+import org.apache.jena.rdf.model.ResourceFactory;
+import org.apache.jena.system.Txn;
+import org.apache.jena.tdb2.TDB2Factory;
+import org.apache.jena.vocabulary.RDFS;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestVisibilityOfChanges {
+    private Dataset dataset;
+
+    @Before
+    public void before() {
+        dataset = TDB2Factory.connectDataset(Location.mem());
+    }
+
+    @After
+    public void after() {
+        dataset.close();
+    }
+
+    @Test
+    public void visibilityOfNewlyCreatedResources() {
+        Resource resource = ResourceFactory.createResource();
+        assertFalse(Txn.calculateRead(dataset, () -> dataset.getDefaultModel().containsResource(resource)));
+        Txn.executeWrite(dataset, () -> {
+            dataset.getDefaultModel().add(resource, RDFS.label, "I exist!");
+            assertTrue(dataset.getDefaultModel().containsResource(resource));
+            rinseCache();
+            assertTrue("A newly created resource should be visible within the write transaction",
+                    dataset.getDefaultModel().containsResource(resource));
+        });
+        assertTrue("A newly created resource should be visible after commit",
+                Txn.calculateRead(dataset, () -> dataset.getDefaultModel().containsResource(resource)));
+    }
+
+    @Test
+    public void testIsolation() {
+        Resource resource = ResourceFactory.createResource();
+        Txn.executeWrite(dataset, () -> {
+            assertFalse(dataset.getDefaultModel().containsResource(resource));
+            dataset.getDefaultModel().add(resource, RDFS.label, "I exist!");
+            rinseCache();
+            executeAndWait(() -> Txn.executeRead(dataset, () -> dataset.getDefaultModel().containsResource(resource)));
+            assertTrue("A read transaction should not pollute the nonPresent cache if there's a write transaction",
+                    dataset.getDefaultModel().containsResource(resource));
+        });
+    }
+
+
+    private void rinseCache() {
+        int cacheSize = 2000;
+        for (int i = 0; i < cacheSize; i++) {
+            Resource newResource = ResourceFactory.createResource();
+            dataset.getDefaultModel().add(newResource, RDFS.label, "");
+        }
+    }
+
+    private void executeAndWait(Runnable runnable) {
+        Thread thread = new Thread(runnable);
+        thread.start();
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            fail();
+        }
+    }
+}
\ No newline at end of file