Merge pull request #695 from afs/jena1758-semaphore

JENA-1758: Use a semaphore to notify thread has finished
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
index a0886a9..24b30bb 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
@@ -18,10 +18,13 @@
 
 package org.apache.jena.tdb2.loader.main;
 
+import static org.apache.jena.tdb2.loader.main.PhasedOps.acquire;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -101,6 +104,9 @@
         }
     }
 
+    /** Semaphore for the other thread to indicate it has finished. */
+    private final Semaphore termination = new Semaphore(0);
+
     @Override
     public void startBulk() {
         thread = new Thread(()->action());
@@ -109,11 +115,7 @@
 
     @Override
     public void finishBulk() {
-        try {
-            thread.join();
-        } catch (InterruptedException e) {
-            throw new BulkLoaderException("InterruptedException", e);
-        }
+        acquire(termination);
     }
 
     // Triples.
@@ -155,6 +157,7 @@
         }
         transaction.end();
         CoLib.finish(coordinator);
+        termination.release();
     }
 
     //@Override
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Indexer.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Indexer.java
index 02cd56f..54aef8f 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Indexer.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Indexer.java
@@ -18,6 +18,8 @@
 
 package org.apache.jena.tdb2.loader.main;
 
+import static org.apache.jena.tdb2.loader.main.PhasedOps.acquire;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -25,7 +27,6 @@
 import java.util.concurrent.Semaphore;
 
 import org.apache.jena.atlas.lib.ArrayUtils;
-import org.apache.jena.atlas.lib.Timer;
 import org.apache.jena.atlas.lib.tuple.Tuple;
 import org.apache.jena.atlas.logging.Log;
 import org.apache.jena.dboe.transaction.txn.Transaction;
@@ -65,16 +66,6 @@
         }
     }
 
-    private static long acquire(Semaphore semaphore, int numPermits) {
-        return Timer.time(()->{
-            try { semaphore.acquire(numPermits); }
-            catch (InterruptedException e) {
-                Log.error(Indexer.class, "Interrupted", e);
-                throw new RuntimeException(e);
-            }
-        });
-    }
-
     /** Return a function that delivers multiple {@code List<Tuple<NodeId>>>} to this indexer */
     public Destination<Tuple<NodeId>> index() {
         return this::index;
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/PhasedOps.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/PhasedOps.java
index 7ee8d3e..852127e 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/PhasedOps.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/PhasedOps.java
@@ -19,11 +19,14 @@
 package org.apache.jena.tdb2.loader.main;
 
 import java.util.*;
+import java.util.concurrent.Semaphore;
 import java.util.function.Consumer;
 import java.util.function.IntFunction;
 import java.util.stream.Stream;
 
+import org.apache.jena.atlas.lib.Timer;
 import org.apache.jena.atlas.lib.tuple.Tuple;
+import org.apache.jena.atlas.logging.Log;
 import org.apache.jena.tdb2.loader.BulkLoaderException;
 import org.apache.jena.tdb2.loader.base.MonitorOutput;
 import org.apache.jena.tdb2.loader.base.ProgressMonitor;
@@ -37,6 +40,22 @@
  */
 class PhasedOps {
 
+    /** Acquire one permit from a semaphore. Return the time spent waiting. */
+    /* package */ static long acquire(Semaphore termination) {
+        return acquire(termination, 1);
+    }
+
+    /** Acquire permits from a semaphore. Return the time spent waiting. */
+    /* package */ static long acquire(Semaphore semaphore, int numPermits) {
+        return Timer.time(()->{
+            try { semaphore.acquire(numPermits); }
+            catch (InterruptedException e) {
+                Log.error(Indexer.class, "Interrupted", e);
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
     static Map<String, TupleIndex> indexMap(DatasetGraphTDB dsgtdb) {
         Map<String, TupleIndex> indexMap = new HashMap<>();
         // All triple/quad indexes.