MSQ: Wait for cleanup if stopping is interrupted. (#18557)

* MSQ: Wait for cleanup if stopping is interrupted.

If a worker is interrupted while it was already stopping, this could lead
to interruption of cleanup and leaking of resources. This patch updates
cleanup code to catch InterruptedException and then continue with cleanup.
After cleanup is done, the thread's interrupt flag will be set.

* Fix style.
diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index 3c8d772..8931b26 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -179,7 +179,7 @@
       setUpCompletionCallbacks();
     }
     catch (Throwable t) {
-      stopUnchecked(t);
+      stop(t);
     }
   }
 
@@ -193,7 +193,7 @@
    * @param t error to send to {@link RunWorkOrderListener#onFailure}, if success/failure has not already been sent.
    *          Will also be thrown at the end of this method.
    */
-  public void stop(@Nullable Throwable t) throws InterruptedException
+  public void stop(@Nullable Throwable t)
   {
     if (state.compareAndSet(State.INIT, State.STOPPING)
         || state.compareAndSet(State.STARTED, State.STOPPING)
@@ -236,34 +236,29 @@
       stopLatch.countDown();
     }
 
-    stopLatch.await();
+    // If stopLatch.await() is interrupted, remember that but keep waiting. This method should only return when
+    // the worker is fully stopped, otherwise cleanup may not have fully happened.
+    boolean interrupted = false;
+    while (stopLatch.getCount() > 0) {
+      try {
+        stopLatch.await();
+      }
+      catch (InterruptedException e) {
+        interrupted = true;
+      }
+    }
+
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
 
     if (t != null) {
-      Throwables.throwIfInstanceOf(t, InterruptedException.class);
       Throwables.throwIfUnchecked(t);
       throw new RuntimeException(t);
     }
   }
 
   /**
-   * Calls {@link #stop(Throwable)}. If the call to {@link #stop(Throwable)} throws {@link InterruptedException},
-   * this method sets the interrupt flag and throws an unchecked exception.
-   *
-   * @param t error to send to {@link RunWorkOrderListener#onFailure}, if success/failure has not already been sent.
-   *          Will also be thrown at the end of this method.
-   */
-  public void stopUnchecked(@Nullable final Throwable t)
-  {
-    try {
-      stop(t);
-    }
-    catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
    * Settable {@link ClusterByPartitions} future for global sort. Necessary because we don't know ahead of time
    * what the boundaries will be. The controller decides based on statistics from all workers. Once the controller
    * decides, its decision is written to this future, which allows sorting on workers to proceed.
diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 66d7fb1..af2f145 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -427,7 +427,7 @@
     );
 
     // Set up processorCloser (called when processing is done).
-    kernelHolder.processorCloser.register(() -> runWorkOrder.stopUnchecked(null));
+    kernelHolder.processorCloser.register(() -> runWorkOrder.stop(null));
 
     // Start working on this stage immediately.
     kernel.startReading();
diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
index e3c1f1f..f14e007 100644
--- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
+++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
@@ -32,7 +32,7 @@
   private static final String CANCELLATION_ID = "my-cancellation-id";
 
   @Test
-  public void test_stopUnchecked() throws InterruptedException
+  public void test_stop()
   {
     final FrameProcessorExecutor exec = Mockito.mock(FrameProcessorExecutor.class);
     final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
@@ -45,10 +45,10 @@
     final RunWorkOrder runWorkOrder =
         new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, workerContext, frameContext, listener);
 
-    runWorkOrder.stopUnchecked(null);
+    runWorkOrder.stop(null);
 
     // Calling a second time doesn't do anything special.
-    runWorkOrder.stopUnchecked(null);
+    runWorkOrder.stop(null);
 
     Mockito.verify(exec).cancel(CANCELLATION_ID);
     Mockito.verify(frameContext).close();
@@ -56,7 +56,7 @@
   }
 
   @Test
-  public void test_stopUnchecked_error() throws InterruptedException
+  public void test_stop_error()
   {
     final FrameProcessorExecutor exec = Mockito.mock(FrameProcessorExecutor.class);
     final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
@@ -73,11 +73,11 @@
 
     Assert.assertThrows(
         IllegalStateException.class,
-        () -> runWorkOrder.stopUnchecked(exception)
+        () -> runWorkOrder.stop(exception)
     );
 
     // Calling a second time doesn't do anything special. We already tried our best.
-    runWorkOrder.stopUnchecked(null);
+    runWorkOrder.stop(null);
 
     Mockito.verify(exec).cancel(CANCELLATION_ID);
     Mockito.verify(frameContext).close();
@@ -85,7 +85,7 @@
   }
 
   @Test
-  public void test_stopUnchecked_errorDuringExecCancel() throws InterruptedException
+  public void test_stop_errorDuringExecCancel()
   {
     final FrameProcessorExecutor exec = Mockito.mock(FrameProcessorExecutor.class);
     final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
@@ -102,7 +102,7 @@
 
     Assert.assertThrows(
         IllegalStateException.class,
-        () -> runWorkOrder.stopUnchecked(null)
+        () -> runWorkOrder.stop(null)
     );
 
     Mockito.verify(exec).cancel(CANCELLATION_ID);
@@ -111,7 +111,7 @@
   }
 
   @Test
-  public void test_stopUnchecked_errorDuringFrameContextClose() throws InterruptedException
+  public void test_stop_errorDuringFrameContextClose()
   {
     final FrameProcessorExecutor exec = Mockito.mock(FrameProcessorExecutor.class);
     final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
@@ -128,7 +128,7 @@
 
     Assert.assertThrows(
         IllegalStateException.class,
-        () -> runWorkOrder.stopUnchecked(null)
+        () -> runWorkOrder.stop(null)
     );
 
     Mockito.verify(exec).cancel(CANCELLATION_ID);
diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
index abcff13..882f5c8 100644
--- a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
+++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
@@ -402,12 +402,7 @@
             }
 
             if (didRemoveFromCancelableProcessors) {
-              try {
-                cancel(Collections.singleton(processor));
-              }
-              catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-              }
+              cancel(Collections.singleton(processor));
             }
           }
         },
@@ -464,7 +459,7 @@
    * Deregisters a cancellationId and cancels any currently-running processors associated with that cancellationId.
    * Waits for any canceled processors to exit before returning.
    */
-  public void cancel(final String cancellationId) throws InterruptedException
+  public void cancel(final String cancellationId)
   {
     Preconditions.checkNotNull(cancellationId, "cancellationId");
 
@@ -577,8 +572,9 @@
    * Logs (but does not throw) exceptions encountered while running {@link FrameProcessor#cleanup()}.
    */
   private void cancel(final Set<FrameProcessor<?>> processorsToCancel)
-      throws InterruptedException
   {
+    boolean interrupted = false;
+
     synchronized (lock) {
       for (final FrameProcessor<?> processor : processorsToCancel) {
         final Thread processorThread = runningProcessors.get(processor);
@@ -591,7 +587,14 @@
 
       // Wait for all running processors to stop running. Then clean them up outside the critical section.
       while (anyIsRunning(processorsToCancel)) {
-        lock.wait();
+        // If lock.wait() is interrupted, remember that but keep waiting. This method needs to proceed onwards
+        // even when interrupted, to ensure processor cleanup happens.
+        try {
+          lock.wait();
+        }
+        catch (InterruptedException e) {
+          interrupted = true;
+        }
       }
     }
 
@@ -616,6 +619,10 @@
         log.noStackTrace().warn(e, "Exception encountered while canceling processor [%s]", processor);
       }
     }
+
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
   }
 
   private <T> void registerCancelableProcessor(final FrameProcessor<T> processor, @Nullable final String cancellationId)