[MINDEXER-188] Ensure that the producer thread doesn't indefinitely block on the queue (#314)
The consumer Threads can quit early when an exception occurs. In that case the consumer will usually block on `put` once the queue is full without a chance to recover.
This change clears the queue and shuts the executor service down as soon an exception is caught.
---
https://issues.apache.org/jira/browse/MINDEXER-188
diff --git a/.gitignore b/.gitignore
index 8b3bdd7..13fd7aa 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,6 +17,10 @@
*.iml
.idea/
+# NetBeans
+nbactions.xml
+nb-configuration.xml
+
# Other
.svn/
bin/
diff --git a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java
index 1b8c2a5..7cdbaf1 100644
--- a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java
+++ b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java
@@ -31,8 +31,10 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
+import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@@ -40,6 +42,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.GZIPInputStream;
import org.apache.lucene.document.Document;
@@ -182,9 +185,10 @@
ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>(10000);
ExecutorService executorService = Executors.newFixedThreadPool(threads);
- ArrayList<Exception> errors = new ArrayList<>();
- ArrayList<FSDirectory> siloDirectories = new ArrayList<>(threads);
- ArrayList<IndexWriter> siloWriters = new ArrayList<>(threads);
+ List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());
+ List<FSDirectory> siloDirectories = new ArrayList<>(threads);
+ List<IndexWriter> siloWriters = new ArrayList<>(threads);
+ AtomicBoolean stopEarly = new AtomicBoolean(false);
LOGGER.debug("Creating {} silo writer threads...", threads);
for (int i = 0; i < threads; i++) {
final int silo = i;
@@ -201,8 +205,12 @@
break;
}
addToIndex(doc, context, siloWriters.get(silo), rootGroups, allGroups);
- } catch (InterruptedException | IOException e) {
+ } catch (Throwable e) {
errors.add(e);
+ if (stopEarly.compareAndSet(false, true)) {
+ queue.clear(); // unblock producer
+ executorService.shutdownNow(); // unblock consumers
+ }
break;
}
}
@@ -215,7 +223,7 @@
LOGGER.debug("Loading up documents into silos");
try {
Document doc;
- while ((doc = readDocument()) != null) {
+ while (!stopEarly.get() && (doc = readDocument()) != null) {
queue.put(doc);
n++;
}
@@ -232,9 +240,15 @@
}
if (!errors.isEmpty()) {
- IOException exception = new IOException("Error during load of index");
- errors.forEach(exception::addSuppressed);
- throw exception;
+ if (errors.stream().allMatch(ex -> ex instanceof IOException || ex instanceof InterruptedException)) {
+ IOException exception = new IOException("Error during load of index");
+ errors.forEach(exception::addSuppressed);
+ throw exception;
+ } else {
+ RuntimeException exception = new RuntimeException("Error during load of index");
+ errors.forEach(exception::addSuppressed);
+ throw exception;
+ }
}
LOGGER.debug("Silos loaded...");
diff --git a/pom.xml b/pom.xml
index d50dd49..beb8452 100644
--- a/pom.xml
+++ b/pom.xml
@@ -366,6 +366,8 @@
<exclude>.git/**</exclude>
<exclude>.idea/**</exclude>
<exclude>**/*.iml</exclude>
+ <exclude>nbactions.xml</exclude>
+ <exclude>nb-configuration.xml</exclude>
<!-- exlude some test resources from rat analysis -->
<exclude>src/test/**/*.sha1</exclude>
<exclude>src/test/**/*.md5</exclude>