Try to fix error msg like: 301: queue has been destroyed

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index 6c556c1..b3b94f6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -38,6 +38,7 @@
 
 import java.util.LinkedList;
 import java.util.Queue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
@@ -177,6 +178,15 @@
    */
   public TsBlock remove() {
     if (closed) {
+      // try throw underlying exception instead of "Source handle is aborted."
+      try {
+        blocked.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(e);
+      } catch (ExecutionException e) {
+        throw new IllegalStateException(e.getCause() == null ? e : e.getCause());
+      }
       throw new IllegalStateException("queue has been destroyed");
     }
     TsBlock tsBlock = queue.remove();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
index 315e75c..4da89f7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
@@ -36,6 +36,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
@@ -254,6 +255,17 @@
 
   private void checkState() {
     if (aborted) {
+      if (queue.isBlocked().isDone()) {
+        // try throw underlying exception instead of "Source handle is aborted."
+        try {
+          queue.isBlocked().get();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
+        } catch (ExecutionException e) {
+          throw new IllegalStateException(e.getCause() == null ? e : e.getCause());
+        }
+      }
       throw new IllegalStateException("Source handle is aborted.");
     } else if (closed) {
       throw new IllegalStateException("Source Handle is closed.");