fix the NPE error when write failed using Ratis (#6945)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusWriteResponse.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusWriteResponse.java
index 5319588..fdb7626 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusWriteResponse.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusWriteResponse.java
@@ -21,6 +21,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.rpc.TSStatusCode;
public class ConsensusWriteResponse extends ConsensusResponse {
@@ -40,6 +41,22 @@
return "ConsensusWriteResponse{" + "status=" + status + "} " + super.toString();
}
+ public boolean isSuccessful() {
+ return status != null && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ }
+
+ public String getErrorMessage() {
+ if (status != null && status.message != null && status.message.length() > 0) {
+ return status.message;
+ }
+ if (exception != null
+ && exception.getMessage() != null
+ && exception.getMessage().length() > 0) {
+ return exception.getMessage();
+ }
+ return "unknown error message";
+ }
+
public static ConsensusWriteResponse.Builder newBuilder() {
return new ConsensusWriteResponse.Builder();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index fa3e47e..f4a756d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -275,9 +275,14 @@
writeResponse = SchemaRegionConsensusImpl.getInstance().write(groupId, planNode);
}
- if (writeResponse.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- logger.error(writeResponse.getStatus().message);
- throw new FragmentInstanceDispatchException(writeResponse.getStatus());
+ if (!writeResponse.isSuccessful()) {
+ logger.error(writeResponse.getErrorMessage());
+ TSStatus failureStatus =
+ writeResponse.getStatus() != null
+ ? writeResponse.getStatus()
+ : RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, writeResponse.getErrorMessage());
+ throw new FragmentInstanceDispatchException(failureStatus);
} else if (hasFailedMeasurement) {
throw new FragmentInstanceDispatchException(
RpcUtils.getStatus(