Try to fix error msg like: 301: queue has been destroyed
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index 6c556c1..b3b94f6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -38,6 +38,7 @@
import java.util.LinkedList;
import java.util.Queue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
@@ -177,6 +178,15 @@
*/
public TsBlock remove() {
if (closed) {
+ // try throw underlying exception instead of "Source handle is aborted."
+ try {
+ blocked.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(e.getCause() == null ? e : e.getCause());
+ }
throw new IllegalStateException("queue has been destroyed");
}
TsBlock tsBlock = queue.remove();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
index 315e75c..4da89f7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
@@ -36,6 +36,7 @@
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
@@ -254,6 +255,17 @@
private void checkState() {
if (aborted) {
+ if (queue.isBlocked().isDone()) {
+ // try throw underlying exception instead of "Source handle is aborted."
+ try {
+ queue.isBlocked().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(e.getCause() == null ? e : e.getCause());
+ }
+ }
throw new IllegalStateException("Source handle is aborted.");
} else if (closed) {
throw new IllegalStateException("Source Handle is closed.");