[IOTDB-6336] Add max retry time duration and whether to retry for unknown errors configurations

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index bbcbc0e..fd1068b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1824,6 +1824,9 @@
 
       // update Consensus config
       reloadConsensusProps(properties);
+
+      // update retry config
+      commonDescriptor.loadRetryProperties(properties);
     } catch (Exception e) {
       throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
     }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
index c004356..230eb27 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
@@ -44,7 +44,7 @@
 
 public class AsyncPlanNodeSender {
 
-  private static final Logger logger = LoggerFactory.getLogger(AsyncPlanNodeSender.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(AsyncPlanNodeSender.class);
   private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
       asyncInternalServiceClientManager;
   private final List<FragmentInstance> instances;
@@ -116,14 +116,14 @@
       status = entry.getValue().getStatus();
       if (!entry.getValue().accepted) {
         if (status == null) {
-          logger.warn(
+          LOGGER.warn(
               "dispatch write failed. message: {}, node {}",
               entry.getValue().message,
               instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
           failureStatusList.add(
               RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage()));
         } else {
-          logger.warn(
+          LOGGER.warn(
               "dispatch write failed. status: {}, code: {}, message: {}, node {}",
               entry.getValue().status,
               TSStatusCode.representOf(status.code),
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index d401a61..7fe941a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -25,6 +25,8 @@
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
@@ -66,8 +68,11 @@
 
 public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
 
-  private static final Logger logger =
+  private static final Logger LOGGER =
       LoggerFactory.getLogger(FragmentInstanceDispatcherImpl.class);
+
+  private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
+
   private final ExecutorService executor;
   private final ExecutorService writeOperationExecutor;
   private final QueryType type;
@@ -126,7 +131,7 @@
       } catch (FragmentInstanceDispatchException e) {
         return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
       } catch (Throwable t) {
-        logger.warn(DISPATCH_FAILED, t);
+        LOGGER.warn(DISPATCH_FAILED, t);
         return immediateFuture(
             new FragInstanceDispatchResult(
                 RpcUtils.getStatus(
@@ -165,7 +170,7 @@
           }
         }
       } catch (Throwable t) {
-        logger.warn(DISPATCH_FAILED, t);
+        LOGGER.warn(DISPATCH_FAILED, t);
         failureStatusList.add(
             RpcUtils.getStatus(
                 TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage()));
@@ -210,7 +215,7 @@
         } catch (FragmentInstanceDispatchException e) {
           dataNodeFailureList.add(e.getFailureStatus());
         } catch (Throwable t) {
-          logger.warn(DISPATCH_FAILED, t);
+          LOGGER.warn(DISPATCH_FAILED, t);
           dataNodeFailureList.add(
               RpcUtils.getStatus(
                   TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage()));
@@ -222,28 +227,35 @@
     // wait until remote dispatch done
     try {
       asyncPlanNodeSender.waitUntilCompleted();
-
-      if (asyncPlanNodeSender.needRetry()) {
+      final long maxRetryDurationInNs =
+          COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() > 0
+              ? COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() * 1_000_000L
+              : 0;
+      if (maxRetryDurationInNs > 0 && asyncPlanNodeSender.needRetry()) {
         // retry failed remote FIs
-        int retry = 0;
-        final int maxRetryTimes = 10;
-        long waitMillis = getRetrySleepTime(retry);
+        int retryCount = 0;
+        long waitMillis = getRetrySleepTime(retryCount);
+        long retryStartTime = System.nanoTime();
 
         while (asyncPlanNodeSender.needRetry()) {
-          retry++;
+          retryCount++;
           asyncPlanNodeSender.retry();
-          if (!(asyncPlanNodeSender.needRetry() && retry < maxRetryTimes)) {
+          // if !(still need retry and current time + next sleep time < maxRetryDurationInNs)
+          if (!(asyncPlanNodeSender.needRetry()
+              && (System.nanoTime() - retryStartTime + waitMillis * 1_000_000L)
+                  < maxRetryDurationInNs)) {
             break;
           }
           // still need to retry, sleep some time before make another retry.
           Thread.sleep(waitMillis);
-          waitMillis = getRetrySleepTime(retry);
+          PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis * 1_000_000L);
+          waitMillis = getRetrySleepTime(retryCount);
         }
       }
 
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      logger.error("Interrupted when dispatching write async", e);
+      LOGGER.error("Interrupted when dispatching write async", e);
       return immediateFuture(
           new FragInstanceDispatchResult(
               RpcUtils.getStatus(
@@ -308,7 +320,7 @@
           TSendFragmentInstanceResp sendFragmentInstanceResp =
               client.sendFragmentInstance(sendFragmentInstanceReq);
           if (!sendFragmentInstanceResp.accepted) {
-            logger.warn(sendFragmentInstanceResp.message);
+            LOGGER.warn(sendFragmentInstanceResp.message);
             if (sendFragmentInstanceResp.isSetNeedRetry()
                 && sendFragmentInstanceResp.isNeedRetry()) {
               throw new RatisReadUnavailableException(sendFragmentInstanceResp.message);
@@ -330,7 +342,7 @@
           TSendSinglePlanNodeResp sendPlanNodeResp =
               client.sendBatchPlanNode(sendPlanNodeReq).getResponses().get(0);
           if (!sendPlanNodeResp.accepted) {
-            logger.warn(
+            LOGGER.warn(
                 "dispatch write failed. status: {}, code: {}, message: {}, node {}",
                 sendPlanNodeResp.status,
                 TSStatusCode.representOf(sendPlanNodeResp.status.code),
@@ -366,7 +378,7 @@
     try {
       dispatchRemoteHelper(instance, endPoint);
     } catch (ClientManagerException | TException | RatisReadUnavailableException e) {
-      logger.warn(
+      LOGGER.warn(
           "can't execute request on node {}, error msg is {}, and we try to reconnect this node.",
           endPoint,
           ExceptionUtils.getRootCause(e).toString());
@@ -374,7 +386,7 @@
       try {
         dispatchRemoteHelper(instance, endPoint);
       } catch (ClientManagerException | TException | RatisReadUnavailableException e1) {
-        logger.warn(
+        LOGGER.warn(
             "can't execute request on node  {} in second try, error msg is {}.",
             endPoint,
             ExceptionUtils.getRootCause(e1).toString());
@@ -398,7 +410,7 @@
             ConsensusGroupId.Factory.createFromTConsensusGroupId(
                 instance.getRegionReplicaSet().getRegionId());
       } catch (Throwable t) {
-        logger.warn("Deserialize ConsensusGroupId failed. ", t);
+        LOGGER.warn("Deserialize ConsensusGroupId failed. ", t);
         throw new FragmentInstanceDispatchException(
             RpcUtils.getStatus(
                 TSStatusCode.EXECUTE_STATEMENT_ERROR,
@@ -414,7 +426,7 @@
                 ? readExecutor.execute(instance)
                 : readExecutor.execute(groupId, instance);
         if (!readResult.isAccepted()) {
-          logger.warn(readResult.getMessage());
+          LOGGER.warn(readResult.getMessage());
           throw new FragmentInstanceDispatchException(
               RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, readResult.getMessage()));
         }
@@ -426,7 +438,7 @@
         if (!writeResult.isAccepted()) {
           // DO NOT LOG READ_ONLY ERROR
           if (writeResult.getStatus().getCode() != TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()) {
-            logger.warn(
+            LOGGER.warn(
                 "write locally failed. TSStatus: {}, message: {}",
                 writeResult.getStatus(),
                 writeResult.getMessage());
diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
index b92142e..958765e 100644
--- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
+++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
@@ -1849,3 +1849,21 @@
 # Default value is -1, which means no limit.
 # Datatype: int
 # load_write_throughput_bytes_per_second=-1
+
+####################
+### Dispatch Retry Configuration
+####################
+
+# The maximum retrying time for write request remotely dispatching, time unit is milliseconds.
+# It only takes effect for write request remotely dispatching, not including locally dispatching and query
+# Set to 0 or negative number to disable remote dispatching write request retrying
+# We will sleep for some time between each retry, 100ms, 200ms, 400ms, 800ms and so on, util reaching 20,000ms, we won't increase the sleeping time any more
+# effectiveMode: hot_reload
+# Datatype: long
+# write_request_remote_dispatch_max_retry_duration_in_ms=60000
+
+# Whether retrying for unknown errors.
+# Current unknown errors includes EXECUTE_STATEMENT_ERROR(301) and INTERNAL_SERVER_ERROR(305)
+# effectiveMode: hot_reload
+# Datatype: boolean
+# enable_retry_for_unknown_error=false
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b78f7dc..1de7f33 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -280,6 +280,10 @@
   private final Set<String> enabledKillPoints =
       KillPoint.parseKillPoints(System.getProperty(IoTDBConstant.INTEGRATION_TEST_KILL_POINTS));
 
+  private volatile boolean retryForUnknownErrors = false;
+
+  private volatile long remoteWriteMaxRetryDurationInMs = 60000;
+
   CommonConfig() {
     // Empty constructor
   }
@@ -1210,4 +1214,20 @@
   public Set<String> getEnabledKillPoints() {
     return enabledKillPoints;
   }
+
+  public boolean isRetryForUnknownErrors() {
+    return retryForUnknownErrors;
+  }
+
+  public void setRetryForUnknownErrors(boolean retryForUnknownErrors) {
+    this.retryForUnknownErrors = retryForUnknownErrors;
+  }
+
+  public long getRemoteWriteMaxRetryDurationInMs() {
+    return remoteWriteMaxRetryDurationInMs;
+  }
+
+  public void setRemoteWriteMaxRetryDurationInMs(long remoteWriteMaxRetryDurationInMs) {
+    this.remoteWriteMaxRetryDurationInMs = remoteWriteMaxRetryDurationInMs;
+  }
 }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 5979a6f..3d42596 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -246,6 +246,8 @@
             properties.getProperty(
                 "cluster_device_limit_threshold",
                 String.valueOf(config.getDeviceLimitThreshold()))));
+
+    loadRetryProperties(properties);
   }
 
   private void loadPipeProps(Properties properties) {
@@ -614,6 +616,20 @@
                 String.valueOf(config.getSubscriptionReadFileBufferSize()))));
   }
 
+  public void loadRetryProperties(Properties properties) {
+    config.setRemoteWriteMaxRetryDurationInMs(
+        Long.parseLong(
+            properties.getProperty(
+                "write_request_remote_dispatch_max_retry_duration_in_ms",
+                String.valueOf(config.getRemoteWriteMaxRetryDurationInMs()))));
+
+    config.setRetryForUnknownErrors(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_retry_for_unknown_error",
+                String.valueOf(config.isRetryForUnknownErrors()))));
+  }
+
   public void loadGlobalConfig(TGlobalConfig globalConfig) {
     config.setTimestampPrecision(globalConfig.timestampPrecision);
     config.setTimePartitionInterval(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java
index 1efbe62..14e3958 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java
@@ -111,6 +111,7 @@
       Metric.PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL.toString();
   private static final String LOCAL_SCHEDULE = "local_scheduler";
   private static final String REMOTE_SCHEDULE = "remote_scheduler";
+  private static final String REMOTE_RETRY_SLEEP = "remote_retry";
 
   static {
     metricInfoMap.put(
@@ -127,10 +128,18 @@
             PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL,
             Tag.STAGE.toString(),
             REMOTE_SCHEDULE));
+    metricInfoMap.put(
+        REMOTE_RETRY_SLEEP,
+        new MetricInfo(
+            MetricType.TIMER,
+            PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL,
+            Tag.STAGE.toString(),
+            REMOTE_RETRY_SLEEP));
   }
 
   private Timer localScheduleTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer remoteScheduleTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer remoteRetrySleepTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
 
   /** Record the time cost of local schedule. */
   public void recordScheduleLocalCost(long costTimeInNanos) {
@@ -142,6 +151,11 @@
     remoteScheduleTimer.updateNanos(costTimeInNanos);
   }
 
+  /** Record the time cost of remote schedule. */
+  public void recordRemoteRetrySleepCost(long costTimeInNanos) {
+    remoteRetrySleepTimer.updateNanos(costTimeInNanos);
+  }
+
   // endregion
 
   // region local schedule
@@ -327,6 +341,13 @@
             MetricLevel.CORE,
             Tag.STAGE.toString(),
             REMOTE_SCHEDULE);
+    remoteRetrySleepTimer =
+        metricService.getOrCreateTimer(
+            PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL,
+            MetricLevel.CORE,
+            Tag.STAGE.toString(),
+            REMOTE_RETRY_SLEEP);
+
     // bind local schedule metrics
     schemaValidateTimer =
         metricService.getOrCreateTimer(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
index be32bf5..97811bc 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
@@ -40,11 +40,16 @@
 
   private static final Set<Integer> NEED_RETRY = new HashSet<>();
 
+  private static final Set<Integer> UNKNOWN_ERRORS = new HashSet<>();
+
   private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
 
   static {
-    NEED_RETRY.add(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-    NEED_RETRY.add(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    // UNKNOWN ERRORS
+    UNKNOWN_ERRORS.add(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+    UNKNOWN_ERRORS.add(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+
+    // KNOWN ERRORS
     NEED_RETRY.add(TSStatusCode.DISPATCH_ERROR.getStatusCode());
     NEED_RETRY.add(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
     NEED_RETRY.add(TSStatusCode.STORAGE_ENGINE_NOT_READY.getStatusCode());
@@ -213,14 +218,23 @@
     int code = status.getCode();
     if (code == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
       for (TSStatus subStatus : status.subStatus) {
+        // any sub codes for MULTIPLE_ERROR don't need to retry, we won't retry for the whole
+        // request
         if (subStatus == null
-            || (subStatus.getCode() != OK.code && !NEED_RETRY.contains(subStatus.getCode()))) {
+            || (subStatus.getCode() != OK.code
+                && !needRetryHelperForSingleStatus(subStatus.getCode()))) {
           return false;
         }
       }
       return true;
     } else {
-      return NEED_RETRY.contains(code);
+      return needRetryHelperForSingleStatus(code);
     }
   }
+
+  // without MULTIPLE_ERROR(302)
+  private static boolean needRetryHelperForSingleStatus(int statusCode) {
+    return NEED_RETRY.contains(statusCode)
+        || (COMMON_CONFIG.isRetryForUnknownErrors() && UNKNOWN_ERRORS.contains(statusCode));
+  }
 }