[MINDEXER-188] Ensure that the producer thread doesn't indefinitely block on the queue (#314)

The consumer Threads can quit early when an exception occurs. In that case the consumer will usually block on `put` once the queue is full without a chance to recover.

This change clears the queue and shuts the executor service down as soon an exception is caught.

---

https://issues.apache.org/jira/browse/MINDEXER-188
diff --git a/.gitignore b/.gitignore
index 8b3bdd7..13fd7aa 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,6 +17,10 @@
 *.iml
 .idea/
 
+# NetBeans
+nbactions.xml
+nb-configuration.xml
+
 # Other
 .svn/
 bin/
diff --git a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java
index 1b8c2a5..7cdbaf1 100644
--- a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java
+++ b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java
@@ -31,8 +31,10 @@
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -40,6 +42,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.lucene.document.Document;
@@ -182,9 +185,10 @@
         ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>(10000);
 
         ExecutorService executorService = Executors.newFixedThreadPool(threads);
-        ArrayList<Exception> errors = new ArrayList<>();
-        ArrayList<FSDirectory> siloDirectories = new ArrayList<>(threads);
-        ArrayList<IndexWriter> siloWriters = new ArrayList<>(threads);
+        List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());
+        List<FSDirectory> siloDirectories = new ArrayList<>(threads);
+        List<IndexWriter> siloWriters = new ArrayList<>(threads);
+        AtomicBoolean stopEarly = new AtomicBoolean(false);
         LOGGER.debug("Creating {} silo writer threads...", threads);
         for (int i = 0; i < threads; i++) {
             final int silo = i;
@@ -201,8 +205,12 @@
                                 break;
                             }
                             addToIndex(doc, context, siloWriters.get(silo), rootGroups, allGroups);
-                        } catch (InterruptedException | IOException e) {
+                        } catch (Throwable e) {
                             errors.add(e);
+                            if (stopEarly.compareAndSet(false, true)) {
+                                queue.clear(); // unblock producer
+                                executorService.shutdownNow(); // unblock consumers
+                            }
                             break;
                         }
                     }
@@ -215,7 +223,7 @@
         LOGGER.debug("Loading up documents into silos");
         try {
             Document doc;
-            while ((doc = readDocument()) != null) {
+            while (!stopEarly.get() && (doc = readDocument()) != null) {
                 queue.put(doc);
                 n++;
             }
@@ -232,9 +240,15 @@
         }
 
         if (!errors.isEmpty()) {
-            IOException exception = new IOException("Error during load of index");
-            errors.forEach(exception::addSuppressed);
-            throw exception;
+            if (errors.stream().allMatch(ex -> ex instanceof IOException || ex instanceof InterruptedException)) {
+                IOException exception = new IOException("Error during load of index");
+                errors.forEach(exception::addSuppressed);
+                throw exception;
+            } else {
+                RuntimeException exception = new RuntimeException("Error during load of index");
+                errors.forEach(exception::addSuppressed);
+                throw exception;
+            }
         }
 
         LOGGER.debug("Silos loaded...");
diff --git a/pom.xml b/pom.xml
index d50dd49..beb8452 100644
--- a/pom.xml
+++ b/pom.xml
@@ -366,6 +366,8 @@
                 <exclude>.git/**</exclude>
                 <exclude>.idea/**</exclude>
                 <exclude>**/*.iml</exclude>
+                <exclude>nbactions.xml</exclude>
+                <exclude>nb-configuration.xml</exclude>
                 <!-- exlude some test resources from rat analysis -->
                 <exclude>src/test/**/*.sha1</exclude>
                 <exclude>src/test/**/*.md5</exclude>