Try Fix Exception Unstable
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index e28ddb4..a5cbeba 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -82,6 +82,8 @@
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getMemoryConfig().getMaxBytesPerFragmentInstance();
+ private Throwable abortedCause = null;
+
// used for SharedTsBlockQueue listener
private final ExecutorService executorService;
@@ -179,6 +181,9 @@
public TsBlock remove() {
if (closed) {
// try throw underlying exception instead of "Source handle is aborted."
+ if (abortedCause != null) {
+ throw new IllegalStateException(abortedCause);
+ }
try {
blocked.get();
} catch (InterruptedException e) {
@@ -342,6 +347,7 @@
if (closed) {
return;
}
+ abortedCause = t;
closed = true;
if (!blocked.isDone()) {
blocked.setException(t);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index f5ac1e3..e1bc7c9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -322,23 +322,6 @@
.collect(Collectors.toList());
}
- public Optional<TSStatus> getErrorCode() {
- return stateMachine.getFailureCauses().stream()
- .filter(e -> e instanceof IoTDBException || e instanceof IoTDBRuntimeException)
- .findFirst()
- .flatMap(
- t -> {
- TSStatus status;
- if (t instanceof IoTDBException) {
- status = new TSStatus(((IoTDBException) t).getErrorCode());
- } else {
- status = new TSStatus(((IoTDBRuntimeException) t).getErrorCode());
- }
- status.setMessage(t.getMessage());
- return Optional.of(status);
- });
- }
-
public void finished() {
stateMachine.finished();
}
@@ -395,6 +378,8 @@
} else if (failure instanceof IoTDBRuntimeException) {
status = new TSStatus(((IoTDBRuntimeException) failure).getErrorCode());
status.setMessage(failure.getMessage());
+ } else {
+ LOGGER.warn("[Unknown exception]: ", failure);
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index fbca6d8..2bef087 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -117,23 +117,7 @@
}
public FragmentInstanceInfo getInstanceInfo() {
- return context
- .getErrorCode()
- .map(
- s ->
- new FragmentInstanceInfo(
- stateMachine.getState(),
- context.getEndTime(),
- context.getFailedCause(),
- context.getFailureInfoList(),
- s))
- .orElseGet(
- () ->
- new FragmentInstanceInfo(
- stateMachine.getState(),
- context.getEndTime(),
- context.getFailedCause(),
- context.getFailureInfoList()));
+ return context.getInstanceInfo();
}
public long getStartTime() {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 1e13446..6cf1d87 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.queryengine.execution.fragment;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -52,7 +51,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -391,23 +389,7 @@
private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) {
FragmentInstanceContext context = instanceContext.get(instanceId);
- Optional<TSStatus> errorCode = context.getErrorCode();
- return errorCode
- .map(
- tsStatus ->
- new FragmentInstanceInfo(
- FragmentInstanceState.FAILED,
- context.getEndTime(),
- context.getFailedCause(),
- context.getFailureInfoList(),
- tsStatus))
- .orElseGet(
- () ->
- new FragmentInstanceInfo(
- FragmentInstanceState.FAILED,
- context.getEndTime(),
- context.getFailedCause(),
- context.getFailureInfoList()));
+ return context.getInstanceInfo();
}
private void removeOldInstances() {