Error handling improvements for frame channels. (#12895)

* Error handling improvements for frame channels.

Two changes:

1) Send errors down in-memory channels (BlockingQueueFrameChannel) on
   failure. This ensures that in situations where a chain of processors
   has been set up on a single machine, all processors see the root
   cause error. In particular, this means the final processor in the
   chain reports the root cause error, which ensures that someone with
   a handle to the final processor will get the proper error.

2) Update FrameFileHttpResponseHandler to expect that the final fetch,
   rather than being simply empty, is also empty with a special header.
   This ensures that the handler is able to tell the difference between
   an empty fetch due to being at EOF, and an empty fetch due to a
   truncated HTTP response (after the 200 OK and headers are sent down,
   but before any content appears).

* Fix tests, imports.

* Checkstyle!
diff --git a/core/src/main/java/org/apache/druid/java/util/common/Either.java b/core/src/main/java/org/apache/druid/java/util/common/Either.java
index 85fc625..71412ba 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/Either.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/Either.java
@@ -20,7 +20,6 @@
 package org.apache.druid.java.util.common;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 
 import javax.annotation.Nullable;
 import java.util.Objects;
@@ -87,7 +86,8 @@
     if (isValue()) {
       return value;
     } else if (error instanceof Throwable) {
-      Throwables.propagateIfPossible((Throwable) error);
+      // Always wrap Throwable, even if we could throw it directly, to provide additional context
+      // about where the exception happened (we want the current stack frame in the trace).
       throw new RuntimeException((Throwable) error);
     } else {
       throw new RuntimeException(error.toString());
diff --git a/core/src/test/java/org/apache/druid/common/EitherTest.java b/core/src/test/java/org/apache/druid/common/EitherTest.java
index a91908e..bb081a2 100644
--- a/core/src/test/java/org/apache/druid/common/EitherTest.java
+++ b/core/src/test/java/org/apache/druid/common/EitherTest.java
@@ -96,8 +96,9 @@
     MatcherAssert.assertThat(either.error(), CoreMatchers.instanceOf(AssertionError.class));
     MatcherAssert.assertThat(either.error().getMessage(), CoreMatchers.equalTo("oh no"));
 
-    final AssertionError e = Assert.assertThrows(AssertionError.class, either::valueOrThrow);
-    MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("oh no"));
+    final RuntimeException e = Assert.assertThrows(RuntimeException.class, either::valueOrThrow);
+    MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(AssertionError.class));
+    MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.equalTo("oh no"));
 
     // Test toString.
     Assert.assertEquals("Error[java.lang.AssertionError: oh no]", either.toString());
diff --git a/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
index 0d8988b..1b0aa00 100644
--- a/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
+++ b/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
@@ -28,6 +28,7 @@
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 
+import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 import java.util.NoSuchElementException;
 import java.util.Optional;
@@ -162,12 +163,12 @@
     }
 
     @Override
-    public void fail()
+    public void fail(@Nullable Throwable cause)
     {
       synchronized (lock) {
         queue.clear();
 
-        if (!queue.offer(Optional.of(Either.error(new ISE("Aborted"))))) {
+        if (!queue.offer(Optional.of(Either.error(cause != null ? cause : new ISE("Failed"))))) {
           // If this happens, it's a bug, potentially due to incorrectly using this class with multiple writers.
           throw new ISE("Could not write error to channel");
         }
diff --git a/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameChannel.java
index 6ec86c4..ccb0166 100644
--- a/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameChannel.java
+++ b/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameChannel.java
@@ -22,6 +22,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.frame.Frame;
 
+import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 
@@ -54,16 +55,20 @@
 
   /**
    * Called prior to {@link #close()} if the writer has failed. Must be followed by a call to {@link #close()}.
+   *
+   * @param cause optional cause of failure. Used by the in-memory channel {@link BlockingQueueFrameChannel.Writable}
+   *              to propagate exeptions to downstream processors. Most other channels ignore the provided cause.
    */
-  void fail() throws IOException;
+  void fail(@Nullable Throwable cause) throws IOException;
 
   /**
    * Finish writing to this channel.
    *
-   * When this method is called without {@link #fail()} having previously been called, the writer is understood to have
-   * completed successfully.
+   * When this method is called without {@link #fail(Throwable)} having previously been called, the writer is
+   * understood to have completed successfully.
    *
-   * After calling this method, no additional calls to {@link #write}, {@link #fail()}, or this method are permitted.
+   * After calling this method, no additional calls to {@link #write}, {@link #fail(Throwable)}, or this method
+   * are permitted.
    */
   @Override
   void close() throws IOException;
diff --git a/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameFileChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameFileChannel.java
index 231b8b8..8ed0a5b 100644
--- a/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameFileChannel.java
+++ b/processing/src/main/java/org/apache/druid/frame/channel/WritableFrameFileChannel.java
@@ -23,6 +23,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.frame.file.FrameFileWriter;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
@@ -44,8 +45,9 @@
   }
 
   @Override
-  public void fail() throws IOException
+  public void fail(@Nullable Throwable cause) throws IOException
   {
+    // Cause is ignored when writing to frame files. Readers can tell the file is truncated, but they won't know why.
     writer.abort();
   }
 
diff --git a/processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java b/processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java
index 08645f5..0f70fb3 100644
--- a/processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java
+++ b/processing/src/main/java/org/apache/druid/frame/file/FrameFileHttpResponseHandler.java
@@ -40,9 +40,15 @@
  * to back off from issuing the next request, if appropriate. However: the handler does not implement backpressure
  * through the {@link HttpResponseHandler.TrafficCop} mechanism. Therefore, it is important that each request retrieve
  * a modest amount of data.
+ *
+ * The last fetch must be empty (zero content bytes) and must have the header {@link #HEADER_LAST_FETCH_NAME} set to
+ * {@link #HEADER_LAST_FETCH_VALUE}. Under these conditions, {@link FrameFilePartialFetch#isLastFetch()} returns true.
  */
 public class FrameFileHttpResponseHandler implements HttpResponseHandler<FrameFilePartialFetch, FrameFilePartialFetch>
 {
+  public static final String HEADER_LAST_FETCH_NAME = "X-Druid-Frame-Last-Fetch";
+  public static final String HEADER_LAST_FETCH_VALUE = "yes";
+
   private final ReadableByteChunksFrameChannel channel;
 
   public FrameFileHttpResponseHandler(final ReadableByteChunksFrameChannel channel)
@@ -53,14 +59,17 @@
   @Override
   public ClientResponse<FrameFilePartialFetch> handleResponse(final HttpResponse response, final TrafficCop trafficCop)
   {
-    final ClientResponse<FrameFilePartialFetch> clientResponse = ClientResponse.unfinished(new FrameFilePartialFetch());
-
     if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
       // Note: if the error body is chunked, we will discard all future chunks due to setting exceptionCaught here.
       // This is OK because we don't need the body; just the HTTP status code.
+      final ClientResponse<FrameFilePartialFetch> clientResponse =
+          ClientResponse.unfinished(new FrameFilePartialFetch(false));
       exceptionCaught(clientResponse, new ISE("Server for [%s] returned [%s]", channel.getId(), response.getStatus()));
       return clientResponse;
     } else {
+      final boolean lastFetchHeaderSet = HEADER_LAST_FETCH_VALUE.equals(response.headers().get(HEADER_LAST_FETCH_NAME));
+      final ClientResponse<FrameFilePartialFetch> clientResponse =
+          ClientResponse.unfinished(new FrameFilePartialFetch(lastFetchHeaderSet));
       return response(clientResponse, response.getContent());
     }
   }
diff --git a/processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java b/processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java
index 2aae404..8c2056d 100644
--- a/processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java
+++ b/processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java
@@ -33,6 +33,7 @@
  */
 public class FrameFilePartialFetch
 {
+  private final boolean lastFetchHeaderSet;
   private long bytesRead;
 
   @Nullable
@@ -41,13 +42,14 @@
   @Nullable
   private ListenableFuture<?> backpressureFuture;
 
-  FrameFilePartialFetch()
+  FrameFilePartialFetch(boolean lastFetchHeaderSet)
   {
+    this.lastFetchHeaderSet = lastFetchHeaderSet;
   }
 
-  public boolean isEmptyFetch()
+  public boolean isLastFetch()
   {
-    return exceptionCaught == null && bytesRead == 0L;
+    return exceptionCaught == null && lastFetchHeaderSet && bytesRead == 0L;
   }
 
   /**
diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
index f83c700..43221de 100644
--- a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
+++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
@@ -50,6 +50,7 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutorService;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
@@ -317,7 +318,7 @@
       {
         for (final WritableFrameChannel outputChannel : outputChannels) {
           try {
-            outputChannel.fail();
+            outputChannel.fail(e);
           }
           catch (Throwable e1) {
             e.addSuppressed(e1);
@@ -535,7 +536,7 @@
       // Fail all output channels prior to calling cleanup.
       for (final WritableFrameChannel outputChannel : processor.outputChannels()) {
         try {
-          outputChannel.fail();
+          outputChannel.fail(new CancellationException("Canceled"));
         }
         catch (Throwable e) {
           log.debug(e, "Exception encountered while marking output channel failed for processor [%s]", processor);
diff --git a/processing/src/test/java/org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.java b/processing/src/test/java/org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.java
index d5d4722..c6c2dc1 100644
--- a/processing/src/test/java/org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.java
+++ b/processing/src/test/java/org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.java
@@ -119,14 +119,14 @@
     Assert.assertFalse(response1.isFinished());
     Assert.assertTrue(response1.isContinueReading());
     Assert.assertFalse(response1.getObj().isExceptionCaught());
-    Assert.assertFalse(response1.getObj().isEmptyFetch());
+    Assert.assertFalse(response1.getObj().isLastFetch());
 
     final ClientResponse<FrameFilePartialFetch> response2 = handler.done(response1);
 
     Assert.assertTrue(response2.isFinished());
     Assert.assertTrue(response2.isContinueReading());
     Assert.assertFalse(response2.getObj().isExceptionCaught());
-    Assert.assertFalse(response2.getObj().isEmptyFetch());
+    Assert.assertFalse(response2.getObj().isLastFetch());
 
     final ListenableFuture<?> backpressureFuture = response2.getObj().backpressureFuture();
     Assert.assertFalse(backpressureFuture.isDone());
@@ -143,7 +143,7 @@
   }
 
   @Test
-  public void testEmptyResponse()
+  public void testEmptyResponseWithoutLastFetchHeader()
   {
     final ClientResponse<FrameFilePartialFetch> response1 = handler.handleResponse(
         makeResponse(HttpResponseStatus.OK, ByteArrays.EMPTY_ARRAY),
@@ -153,14 +153,42 @@
     Assert.assertFalse(response1.isFinished());
     Assert.assertTrue(response1.isContinueReading());
     Assert.assertFalse(response1.getObj().isExceptionCaught());
-    Assert.assertTrue(response1.getObj().isEmptyFetch());
+    Assert.assertFalse(response1.getObj().isLastFetch());
 
     final ClientResponse<FrameFilePartialFetch> response2 = handler.done(response1);
 
     Assert.assertTrue(response2.isFinished());
     Assert.assertTrue(response2.isContinueReading());
     Assert.assertFalse(response2.getObj().isExceptionCaught());
-    Assert.assertTrue(response2.getObj().isEmptyFetch());
+    Assert.assertFalse(response2.getObj().isLastFetch());
+    Assert.assertTrue(response2.getObj().backpressureFuture().isDone());
+  }
+
+  @Test
+  public void testEmptyResponseWithLastFetchHeader()
+  {
+    final HttpResponse serverResponse = makeResponse(HttpResponseStatus.OK, ByteArrays.EMPTY_ARRAY);
+    serverResponse.headers().set(
+        FrameFileHttpResponseHandler.HEADER_LAST_FETCH_NAME,
+        FrameFileHttpResponseHandler.HEADER_LAST_FETCH_VALUE
+    );
+
+    final ClientResponse<FrameFilePartialFetch> response1 = handler.handleResponse(
+        serverResponse,
+        null
+    );
+
+    Assert.assertFalse(response1.isFinished());
+    Assert.assertTrue(response1.isContinueReading());
+    Assert.assertFalse(response1.getObj().isExceptionCaught());
+    Assert.assertTrue(response1.getObj().isLastFetch());
+
+    final ClientResponse<FrameFilePartialFetch> response2 = handler.done(response1);
+
+    Assert.assertTrue(response2.isFinished());
+    Assert.assertTrue(response2.isContinueReading());
+    Assert.assertFalse(response2.getObj().isExceptionCaught());
+    Assert.assertTrue(response2.getObj().isLastFetch());
     Assert.assertTrue(response2.getObj().backpressureFuture().isDone());
   }
 
@@ -186,7 +214,7 @@
 
       Assert.assertFalse(response.isFinished());
       Assert.assertFalse(response.getObj().isExceptionCaught());
-      Assert.assertFalse(response.getObj().isEmptyFetch());
+      Assert.assertFalse(response.getObj().isLastFetch());
     }
 
     final ClientResponse<FrameFilePartialFetch> finalResponse = handler.done(response);
@@ -194,7 +222,7 @@
     Assert.assertTrue(finalResponse.isFinished());
     Assert.assertTrue(finalResponse.isContinueReading());
     Assert.assertFalse(response.getObj().isExceptionCaught());
-    Assert.assertFalse(response.getObj().isEmptyFetch());
+    Assert.assertFalse(response.getObj().isLastFetch());
 
     final ListenableFuture<?> backpressureFuture = response.getObj().backpressureFuture();
     Assert.assertFalse(backpressureFuture.isDone());
diff --git a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
index 792436e..349fdb2 100644
--- a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
+++ b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
@@ -195,18 +195,23 @@
       );
 
       MatcherAssert.assertThat(
-          e.getCause(),
+          e.getCause().getCause(),
           ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("failure!"))
       );
 
       final ReadableFrameChannel outReadableChannel = outChannel.readable();
       Assert.assertTrue(outReadableChannel.canRead());
 
-      Assert.assertThrows(
-          IllegalStateException.class,
+      final RuntimeException readException = Assert.assertThrows(
+          RuntimeException.class,
           outReadableChannel::read
       );
 
+      MatcherAssert.assertThat(
+          readException.getCause(),
+          ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("failure!"))
+      );
+
       Assert.assertTrue(outReadableChannel.isFinished()); // Finished now that we read the error
     }
 
diff --git a/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java b/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
index caec798..cd13bcd 100644
--- a/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
+++ b/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
@@ -216,7 +216,11 @@
 
     final ExecutionException e = Assert.assertThrows(ExecutionException.class, future::get);
     MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RuntimeException.class));
-    MatcherAssert.assertThat(e.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("failure!")));
+    MatcherAssert.assertThat(e.getCause().getCause(), CoreMatchers.instanceOf(RuntimeException.class));
+    MatcherAssert.assertThat(
+        e.getCause().getCause(),
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("failure!"))
+    );
   }
 
   @Test