LUCENE-8989: Allow IndexSearcher To Handle Rejected Execution (#899)
When executing queries using Executors, we should gracefully handle
the case when Executor rejects a task and run the task on the caller
thread
diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
index bc1364e..3a23f06 100644
--- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
+++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
@@ -31,6 +31,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionException;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
@@ -667,7 +668,13 @@
search(Arrays.asList(leaves), weight, collector);
return collector;
});
- executor.execute(task);
+ try {
+ executor.execute(task);
+ } catch (RejectedExecutionException e) {
+ // Execute on caller thread
+ search(Arrays.asList(leaves), weight, collector);
+ }
+
topDocsFutures.add(task);
}
final LeafReaderContext[] leaves = leafSlices[leafSlices.length - 1].leaves;
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java b/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java
index a168aac..d63d3e7 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java
@@ -20,11 +20,17 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
@@ -265,4 +271,86 @@
assertEquals(leaves.size() - 1, numExecutions.get());
}
}
+
+ public void testRejectedExecution() throws IOException {
+ List<LeafReaderContext> leaves = reader.leaves();
+ AtomicInteger numExecutions = new AtomicInteger(0);
+ ExecutorService service = new RejectingMockExecutor();
+
+ IndexSearcher searcher = new IndexSearcher(reader, service) {
+ @Override
+ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+ ArrayList<LeafSlice> slices = new ArrayList<>();
+ for (LeafReaderContext ctx : leaves) {
+ slices.add(new LeafSlice(Arrays.asList(ctx)));
+ }
+ return slices.toArray(new LeafSlice[0]);
+ }
+ };
+
+ // To ensure that failing ExecutorService still allows query to run
+ // successfully
+ searcher.search(new MatchAllDocsQuery(), 10);
+ if (leaves.size() <= 1) {
+ assertEquals(0, numExecutions.get());
+ } else {
+ assertEquals(leaves.size() - 1, numExecutions.get());
+ }
+
+ service.shutdown();
+ }
+
+ private static class RejectingMockExecutor implements ExecutorService {
+
+ public void shutdown() {
+ }
+
+ public List<Runnable> shutdownNow() {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isShutdown() {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isTerminated() {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean awaitTermination(final long l, final TimeUnit timeUnit) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> Future<T> submit(final Callable<T> tCallable) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> Future<T> submit(final Runnable runnable, final T t) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Future<?> submit(final Runnable runnable) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> callables) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> callables, final long l, final TimeUnit timeUnit) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T invokeAny(final Collection<? extends Callable<T>> callables) throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T invokeAny(final Collection<? extends Callable<T>> callables, final long l, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void execute(final Runnable runnable) {
+ throw new RejectedExecutionException();
+ }
+ }
}