Merge pull request #695 from afs/jena1758-semaphore
JENA-1758: Use a semaphore to notify thread has finished
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
index a0886a9..24b30bb 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
@@ -18,10 +18,13 @@
package org.apache.jena.tdb2.loader.main;
+import static org.apache.jena.tdb2.loader.main.PhasedOps.acquire;
+
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -101,6 +104,9 @@
}
}
+ /** Semaphore for the other thread to indicate it has finished. */
+ private final Semaphore termination = new Semaphore(0);
+
@Override
public void startBulk() {
thread = new Thread(()->action());
@@ -109,11 +115,7 @@
@Override
public void finishBulk() {
- try {
- thread.join();
- } catch (InterruptedException e) {
- throw new BulkLoaderException("InterruptedException", e);
- }
+ acquire(termination);
}
// Triples.
@@ -155,6 +157,7 @@
}
transaction.end();
CoLib.finish(coordinator);
+ termination.release();
}
//@Override
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Indexer.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Indexer.java
index 02cd56f..54aef8f 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Indexer.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Indexer.java
@@ -18,6 +18,8 @@
package org.apache.jena.tdb2.loader.main;
+import static org.apache.jena.tdb2.loader.main.PhasedOps.acquire;
+
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
@@ -25,7 +27,6 @@
import java.util.concurrent.Semaphore;
import org.apache.jena.atlas.lib.ArrayUtils;
-import org.apache.jena.atlas.lib.Timer;
import org.apache.jena.atlas.lib.tuple.Tuple;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.dboe.transaction.txn.Transaction;
@@ -65,16 +66,6 @@
}
}
- private static long acquire(Semaphore semaphore, int numPermits) {
- return Timer.time(()->{
- try { semaphore.acquire(numPermits); }
- catch (InterruptedException e) {
- Log.error(Indexer.class, "Interrupted", e);
- throw new RuntimeException(e);
- }
- });
- }
-
/** Return a function that delivers multiple {@code List<Tuple<NodeId>>>} to this indexer */
public Destination<Tuple<NodeId>> index() {
return this::index;
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/PhasedOps.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/PhasedOps.java
index 7ee8d3e..852127e 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/PhasedOps.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/PhasedOps.java
@@ -19,11 +19,14 @@
package org.apache.jena.tdb2.loader.main;
import java.util.*;
+import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.stream.Stream;
+import org.apache.jena.atlas.lib.Timer;
import org.apache.jena.atlas.lib.tuple.Tuple;
+import org.apache.jena.atlas.logging.Log;
import org.apache.jena.tdb2.loader.BulkLoaderException;
import org.apache.jena.tdb2.loader.base.MonitorOutput;
import org.apache.jena.tdb2.loader.base.ProgressMonitor;
@@ -37,6 +40,22 @@
*/
class PhasedOps {
+ /** Acquire one permit from a semaphore. Return the time spent waiting. */
+ /* package */ static long acquire(Semaphore termination) {
+ return acquire(termination, 1);
+ }
+
+ /** Acquire permits from a semaphore. Return the time spent waiting. */
+ /* package */ static long acquire(Semaphore semaphore, int numPermits) {
+ return Timer.time(()->{
+ try { semaphore.acquire(numPermits); }
+ catch (InterruptedException e) {
+ Log.error(Indexer.class, "Interrupted", e);
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
static Map<String, TupleIndex> indexMap(DatasetGraphTDB dsgtdb) {
Map<String, TupleIndex> indexMap = new HashMap<>();
// All triple/quad indexes.