Merge pull request #672 from afs/jena1817_tdb2_node_cache
JENA-1817: Flush NodeTableCache in CommitFinish
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
index 42d2697..761c975 100644
--- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java
@@ -122,21 +122,16 @@
this(Journal.create(location));
}
- /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */
+ /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent TransactionalComponents}. */
public TransactionCoordinator(Journal journal) {
this(journal, null , new ArrayList<>());
}
- /** Create a TransactionCoordinator, initially with {@link TransactionalComponent} in the ComponentGroup */
+ /** Create a TransactionCoordinator, initially with {@link TransactionalComponent TransactionalComponents} in the ComponentGroup */
public TransactionCoordinator(Journal journal, List<TransactionalComponent> components) {
this(journal, components , new ArrayList<>());
}
- // /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */
-// public TransactionCoordinator(Location journalLocation) {
-// this(Journal.create(journalLocation), new ArrayList<>() , new ArrayList<>());
-// }
-
private TransactionCoordinator(Journal journal, List<TransactionalComponent> txnComp, List<ShutdownHook> shutdownHooks) {
this.journal = journal;
this.shutdownHooks = new ArrayList<>(shutdownHooks);
@@ -467,7 +462,7 @@
/**
* Block until no writers are active, optionally blocking or returning if can't at the moment.
* <p>
- * Unlike a write transction, there is no associated transaction.
+ * Unlike a write transaction, there is no associated transaction.
* <p>
* If it returns true, the application must call {@link #enableWriters} later.
* @param canBlock
@@ -732,11 +727,6 @@
/*package*/ void executeCommit(Transaction transaction, Runnable commit, Runnable finish, Runnable sysabort) {
notifyCommitStart(transaction);
if ( transaction.getMode() == ReadWrite.READ ) {
-
- //[1746]
- //executeCommitReader();
- // No commit on components, all "end".
- // Make abort the same?
finish.run();
notifyCommitFinish(transaction);
return;
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
index aa02c25..ef18e84 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableCache.java
@@ -326,7 +326,7 @@
}
@Override
- public void notifyCompleteFinish(Transaction transaction) {
+ public void notifyCommitFinish(Transaction transaction) {
if(transaction.isWriteTxn()) {
updateCommit();
}
@@ -363,7 +363,6 @@
private void updateCommit() {
writingThread = null;
- // Write to main caches.
node2id_Cache.flushBuffer();
id2node_Cache.flushBuffer();
}
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java
index b683671..6837d95 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/ThreadBufferingCache.java
@@ -32,8 +32,8 @@
/**
* A cache that buffers changes.
* <p>
- * It has two modes, when active it captures updates and the underlying main cache is
- * only updated when {@link #flushBuffer} is called. When not active, it passes
+ * It has two modes: when active, it captures updates and the underlying main cache is
+ * only updated when {@link #flushBuffer} is called; when not active, it passes
* updates straight through.
* <p>
* For access operations, it looks in the buffered cache and the underlying cache as
@@ -65,8 +65,8 @@
private boolean buffering() {
if ( ! BUFFERING )
return false;
- // Changes are sync'ed and the only way to change this value is via a sync'ed method.
- if ( bufferingThread == null )
+ // Changes are sync'ed externally and the only way to change this value is via a sync'ed method.
+ if ( bufferingThread.get() == null )
return false;
Thread currentThread = Thread.currentThread();
return bufferingThread.get() == currentThread;
@@ -93,16 +93,11 @@
public void flushBuffer() {
if ( ! buffering() )
return ;
- //System.out.println(label+": Flush:1 L: "+localCache().size());
- //System.out.println(label+": Flush:1 M: "+baseCache.size());
-
localCache().keys().forEachRemaining(k->{
Value value = localCache().getIfPresent(k);
baseCache.put(k, value);
});
localCache().clear();
- //System.out.println(label+": Flush:2 L: "+localCache().size());
- //System.out.println(label+": Flush:2 M: "+baseCache.size());
bufferingThread.set(null);
}
@@ -110,8 +105,6 @@
public void dropBuffer() {
if ( ! buffering() )
return ;
- //System.out.println(label+": Drop: L: "+localCache().size());
- //System.out.println(label+": Drop: M: "+baseCache.size());
localCache().clear();
bufferingThread.set(null);
}
@@ -164,12 +157,6 @@
return item;
}
- // ---- Flush changes, reset.
-
-
-
- // ---- Updates to buffering, local cache.
-
/** Goes into local cache. */
@Override
public void put(Key key, Value value) {
@@ -189,7 +176,6 @@
localCache().remove(key);
}
-
@Override
public Iterator<Key> keys() {
if ( ! buffering() )
diff --git a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/TestTDB2.java b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/TestTDB2.java
index 211c6cf..d1af13c 100644
--- a/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/TestTDB2.java
+++ b/jena-db/jena-tdb2/src/test/java/org/apache/jena/tdb2/TestTDB2.java
@@ -19,8 +19,12 @@
package org.apache.jena.tdb2;
import java.io.ByteArrayOutputStream;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import org.apache.jena.atlas.lib.FileOps;
+import org.apache.jena.atlas.lib.Lib;
+import org.apache.jena.atlas.lib.ThreadLib;
import org.apache.jena.query.TxnType;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFDataMgr;
@@ -58,7 +62,6 @@
// Errors that can occur:
// One common term -> no conversion.
// Two common terms -> bad read.
- // Feature control: ThreadBufferingCache.BUFFERING
@Test public void abort1() {
Quad q1 = SSE.parseQuad("(:g :s :p :o)");
@@ -98,4 +101,53 @@
private static void output(DatasetGraph dsg) {
Txn.executeRead(dsg, ()->RDFDataMgr.write(new ByteArrayOutputStream(), dsg, Lang.NQUADS));
}
+
+ //JENA-1817: Two W txn, where the second queues on entry.
+ @Test public void multiple_writers() {
+ Quad q1 = SSE.parseQuad("(:g :s :p :o1)");
+ Quad q2 = SSE.parseQuad("(:g :s :p :o2)");
+ DatasetGraph dsg = DatabaseMgr.createDatasetGraph();
+
+ // Test controls
+ Semaphore sema = new Semaphore(0);
+ Semaphore semaTestFinished = new Semaphore(0);
+
+ // Setup writers.
+ Runnable r1 = ()->{
+ Txn.executeWrite(dsg, ()->{
+ // Allow thread 2 run and try to enter the W txn
+ sema.release(1);
+ dsg.add(q1);
+ // Gives thread2 a chance to enter (can't do this by lock).
+ // It is unfortunate that it's a timeout.
+ Lib.sleep(250);
+ });
+ // Finished.
+ semaTestFinished.release(1);
+ };
+
+ Runnable r2 = ()->{
+ acquire(sema,1);
+ // Thread 1 is now inside its W txn.
+ Txn.executeWrite(dsg, () -> dsg.add(q2));
+ semaTestFinished.release(1);
+ };
+ ThreadLib.async(r2);
+ ThreadLib.async(r1);
+
+ // Trigger writers.
+ sema.release(2);
+ // Wait until test threads have finished
+ acquire(semaTestFinished, 2);
+ }
+
+ private static void acquire(Semaphore semaphore, int permits) {
+ try {
+ boolean b = semaphore.tryAcquire(permits, 1000, TimeUnit.MILLISECONDS);
+ if ( !b )
+ throw new RuntimeException("Test failure - did not get permits in the time allowed");
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
}