Merge pull request #672 from afs/jena1817_tdb2_node_cache

JENA-1817: Flush NodeTableCache in CommitFinish
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
index 42d2697..761c975 100644
--- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
@@ -122,21 +122,16 @@
         this(Journal.create(location));
     }
 
-    /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */
+    /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent TransactionalComponents}. */
     public TransactionCoordinator(Journal journal) {
         this(journal, null , new ArrayList<>());
     }
 
-    /** Create a TransactionCoordinator, initially with {@link TransactionalComponent} in the ComponentGroup */
+    /** Create a TransactionCoordinator, initially with {@link TransactionalComponent TransactionalComponents} in the ComponentGroup */
     public TransactionCoordinator(Journal journal, List<TransactionalComponent> components) {
         this(journal, components , new ArrayList<>());
     }
 
-    //    /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */
-//    public TransactionCoordinator(Location journalLocation) {
-//        this(Journal.create(journalLocation), new ArrayList<>() , new ArrayList<>());
-//    }
-
     private TransactionCoordinator(Journal journal, List<TransactionalComponent> txnComp, List<ShutdownHook> shutdownHooks) {
         this.journal = journal;
         this.shutdownHooks = new ArrayList<>(shutdownHooks);
@@ -467,7 +462,7 @@
     /**
      * Block until no writers are active, optionally blocking or returning if can't at the moment.
      * <p>
-     * Unlike a write transction, there is no associated transaction.
+     * Unlike a write transaction, there is no associated transaction.
      * <p>
      * If it returns true, the application must call {@link #enableWriters} later.
      * @param canBlock
@@ -732,11 +727,6 @@
     /*package*/ void executeCommit(Transaction transaction, Runnable commit, Runnable finish, Runnable sysabort) {
         notifyCommitStart(transaction);
         if ( transaction.getMode() == ReadWrite.READ ) {
-
-            //[1746]
-            //executeCommitReader();
-            // No commit on components, all "end".
-            // Make abort the same?
             finish.run();
             notifyCommitFinish(transaction);
             return;
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
index aa02c25..ef18e84 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
@@ -326,7 +326,7 @@
     }
 
     @Override
-    public void notifyCompleteFinish(Transaction transaction) {
+    public void notifyCommitFinish(Transaction transaction) {
         if(transaction.isWriteTxn()) {
             updateCommit();
         }
@@ -363,7 +363,6 @@
 
     private void updateCommit() {
         writingThread = null;
-        // Write to main caches.
         node2id_Cache.flushBuffer();
         id2node_Cache.flushBuffer();
     }
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java
index b683671..6837d95 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java
@@ -32,8 +32,8 @@
 /**
  * A cache that buffers changes.
  * <p>
- * It has two modes, when active it captures updates and the underlying main cache is
- * only updated when {@link #flushBuffer} is called. When not active, it passes
+ * It has two modes: when active, it captures updates and the underlying main cache is
+ * only updated when {@link #flushBuffer} is called; when not active, it passes
  * updates straight through.
  * <p>
  * For access operations, it looks in the buffered cache and the underlying cache as
@@ -65,8 +65,8 @@
     private boolean buffering() {
         if ( ! BUFFERING )
             return false;
-        // Changes are sync'ed and the only way to change this value is via a sync'ed method.
-        if ( bufferingThread == null )
+        // Changes are sync'ed externally and the only way to change this value is via a sync'ed method.
+        if ( bufferingThread.get() == null )
             return false;
         Thread currentThread = Thread.currentThread();
         return bufferingThread.get() == currentThread;
@@ -93,16 +93,11 @@
     public void flushBuffer() {
         if ( ! buffering() )
             return ;
-        //System.out.println(label+": Flush:1 L: "+localCache().size());
-        //System.out.println(label+": Flush:1 M: "+baseCache.size());
-
         localCache().keys().forEachRemaining(k->{
             Value value = localCache().getIfPresent(k);
             baseCache.put(k, value);
         });
         localCache().clear();
-        //System.out.println(label+": Flush:2 L: "+localCache().size());
-        //System.out.println(label+": Flush:2 M: "+baseCache.size());
         bufferingThread.set(null);
     }
 
@@ -110,8 +105,6 @@
     public void dropBuffer() {
         if ( ! buffering() )
             return ;
-        //System.out.println(label+": Drop: L: "+localCache().size());
-        //System.out.println(label+": Drop: M: "+baseCache.size());
         localCache().clear();
         bufferingThread.set(null);
     }
@@ -164,12 +157,6 @@
         return item;
     }
 
-    // ---- Flush changes, reset.
-
-
-
-    // ---- Updates to buffering, local cache.
-
     /** Goes into local cache. */
     @Override
     public void put(Key key, Value value) {
@@ -189,7 +176,6 @@
         localCache().remove(key);
     }
 
-
     @Override
     public Iterator<Key> keys() {
         if ( ! buffering() )
diff --git a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/TestTDB2.java b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/TestTDB2.java
index 211c6cf..d1af13c 100644
--- a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/TestTDB2.java
+++ b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/TestTDB2.java
@@ -19,8 +19,12 @@
 package org.apache.jena.tdb2;
 
 import java.io.ByteArrayOutputStream;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.jena.atlas.lib.FileOps;
+import org.apache.jena.atlas.lib.Lib;
+import org.apache.jena.atlas.lib.ThreadLib;
 import org.apache.jena.query.TxnType;
 import org.apache.jena.riot.Lang;
 import org.apache.jena.riot.RDFDataMgr;
@@ -58,7 +62,6 @@
     // Errors that can occur:
     //   One common term -> no conversion.
     //   Two common terms -> bad read.
-    // Feature control: ThreadBufferingCache.BUFFERING
 
     @Test public void abort1() {
         Quad q1 = SSE.parseQuad("(:g :s :p :o)");
@@ -98,4 +101,53 @@
     private static void output(DatasetGraph dsg) {
         Txn.executeRead(dsg, ()->RDFDataMgr.write(new ByteArrayOutputStream(),  dsg, Lang.NQUADS));
     }
+    
+    //JENA-1817: Two W txn, where the second queues on entry.  
+    @Test public void multiple_writers() {
+        Quad q1 = SSE.parseQuad("(:g :s :p :o1)");
+        Quad q2 = SSE.parseQuad("(:g :s :p :o2)");
+        DatasetGraph dsg = DatabaseMgr.createDatasetGraph();
+        
+        // Test controls
+        Semaphore sema =  new Semaphore(0);
+        Semaphore semaTestFinished =  new Semaphore(0);
+        
+        // Setup writers.
+        Runnable r1 = ()->{
+            Txn.executeWrite(dsg,  ()->{
+                // Allow thread 2 run and try to enter the W txn 
+                sema.release(1);
+                dsg.add(q1);
+                // Gives thread2 a chance to enter (can't do this by lock).
+                // It is unfortunate that it's a timeout.
+                Lib.sleep(250);
+            });
+            // Finished.
+            semaTestFinished.release(1);
+        };
+        
+        Runnable r2 = ()->{
+            acquire(sema,1);
+            // Thread 1 is now inside its W txn.
+            Txn.executeWrite(dsg, () -> dsg.add(q2));
+            semaTestFinished.release(1);
+        };
+        ThreadLib.async(r2);
+        ThreadLib.async(r1);
+        
+        // Trigger writers. 
+        sema.release(2);
+        // Wait until test threads have finished
+        acquire(semaTestFinished, 2);
+    }
+    
+    private static void acquire(Semaphore semaphore, int permits) {
+        try {
+            boolean b = semaphore.tryAcquire(permits, 1000, TimeUnit.MILLISECONDS);
+            if ( !b )
+                throw new RuntimeException("Test failure - did not get permits in the time allowed");
+        } catch (InterruptedException ex) {
+            ex.printStackTrace();
+        }
+    }
 }