LUCENE-8989: Allow IndexSearcher To Handle Rejected Execution (#899)

When executing queries using Executors, we should gracefully handle
the case when Executor rejects a task and run the task on the caller
thread
diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
index bc1364e..3a23f06 100644
--- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
+++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
@@ -31,6 +31,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.DirectoryReader;
@@ -667,7 +668,13 @@
           search(Arrays.asList(leaves), weight, collector);
           return collector;
         });
-        executor.execute(task);
+        try {
+          executor.execute(task);
+        } catch (RejectedExecutionException e) {
+          // Execute on caller thread
+          search(Arrays.asList(leaves), weight, collector);
+        }
+
         topDocsFutures.add(task);
       }
       final LeafReaderContext[] leaves = leafSlices[leafSlices.length - 1].leaves;
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java b/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java
index a168aac..d63d3e7 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java
@@ -20,11 +20,17 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.document.Document;
@@ -265,4 +271,86 @@
       assertEquals(leaves.size() - 1, numExecutions.get());
     }
   }
+
+  public void testRejectedExecution() throws IOException {
+    List<LeafReaderContext> leaves = reader.leaves();
+    AtomicInteger numExecutions = new AtomicInteger(0);
+    ExecutorService service = new RejectingMockExecutor();
+
+    IndexSearcher searcher = new IndexSearcher(reader, service) {
+      @Override
+      protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
+        ArrayList<LeafSlice> slices = new ArrayList<>();
+        for (LeafReaderContext ctx : leaves) {
+          slices.add(new LeafSlice(Arrays.asList(ctx)));
+        }
+        return slices.toArray(new LeafSlice[0]);
+      }
+    };
+
+    // To ensure that failing ExecutorService still allows query to run
+    // successfully
+    searcher.search(new MatchAllDocsQuery(), 10);
+    if (leaves.size() <= 1) {
+      assertEquals(0, numExecutions.get());
+    } else {
+      assertEquals(leaves.size() - 1, numExecutions.get());
+    }
+
+    service.shutdown();
+  }
+
+  private static class RejectingMockExecutor implements ExecutorService {
+
+    public void shutdown() {
+    }
+
+    public List<Runnable> shutdownNow() {
+      throw new UnsupportedOperationException();
+    }
+
+    public boolean isShutdown() {
+      throw new UnsupportedOperationException();
+    }
+
+    public boolean isTerminated() {
+      throw new UnsupportedOperationException();
+    }
+
+    public boolean awaitTermination(final long l, final TimeUnit timeUnit) throws InterruptedException {
+      throw new UnsupportedOperationException();
+    }
+
+    public <T> Future<T> submit(final Callable<T> tCallable) {
+      throw new UnsupportedOperationException();
+    }
+
+    public <T> Future<T> submit(final Runnable runnable, final T t) {
+      throw new UnsupportedOperationException();
+    }
+
+    public Future<?> submit(final Runnable runnable) {
+      throw  new UnsupportedOperationException();
+    }
+
+    public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> callables) throws InterruptedException {
+      throw new UnsupportedOperationException();
+    }
+
+    public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> callables, final long l, final TimeUnit timeUnit) throws InterruptedException {
+      throw new UnsupportedOperationException();
+    }
+
+    public <T> T invokeAny(final Collection<? extends Callable<T>> callables) throws InterruptedException, ExecutionException {
+      throw new UnsupportedOperationException();
+    }
+
+    public <T> T invokeAny(final Collection<? extends Callable<T>> callables, final long l, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
+      throw new UnsupportedOperationException();
+    }
+
+    public void execute(final Runnable runnable) {
+      throw new RejectedExecutionException();
+    }
+  }
 }