[IOTDB-6336] Add max retry time duration and whether to retry for unknown errors configurations
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index bbcbc0e..fd1068b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1824,6 +1824,9 @@
// update Consensus config
reloadConsensusProps(properties);
+
+ // update retry config
+ commonDescriptor.loadRetryProperties(properties);
} catch (Exception e) {
throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
index c004356..230eb27 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
@@ -44,7 +44,7 @@
public class AsyncPlanNodeSender {
- private static final Logger logger = LoggerFactory.getLogger(AsyncPlanNodeSender.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AsyncPlanNodeSender.class);
private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
asyncInternalServiceClientManager;
private final List<FragmentInstance> instances;
@@ -116,14 +116,14 @@
status = entry.getValue().getStatus();
if (!entry.getValue().accepted) {
if (status == null) {
- logger.warn(
+ LOGGER.warn(
"dispatch write failed. message: {}, node {}",
entry.getValue().message,
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
failureStatusList.add(
RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage()));
} else {
- logger.warn(
+ LOGGER.warn(
"dispatch write failed. status: {}, code: {}, message: {}, node {}",
entry.getValue().status,
TSStatusCode.representOf(status.code),
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index d401a61..7fe941a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -25,6 +25,8 @@
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
@@ -66,8 +68,11 @@
public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
- private static final Logger logger =
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FragmentInstanceDispatcherImpl.class);
+
+ private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
+
private final ExecutorService executor;
private final ExecutorService writeOperationExecutor;
private final QueryType type;
@@ -126,7 +131,7 @@
} catch (FragmentInstanceDispatchException e) {
return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
} catch (Throwable t) {
- logger.warn(DISPATCH_FAILED, t);
+ LOGGER.warn(DISPATCH_FAILED, t);
return immediateFuture(
new FragInstanceDispatchResult(
RpcUtils.getStatus(
@@ -165,7 +170,7 @@
}
}
} catch (Throwable t) {
- logger.warn(DISPATCH_FAILED, t);
+ LOGGER.warn(DISPATCH_FAILED, t);
failureStatusList.add(
RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage()));
@@ -210,7 +215,7 @@
} catch (FragmentInstanceDispatchException e) {
dataNodeFailureList.add(e.getFailureStatus());
} catch (Throwable t) {
- logger.warn(DISPATCH_FAILED, t);
+ LOGGER.warn(DISPATCH_FAILED, t);
dataNodeFailureList.add(
RpcUtils.getStatus(
TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage()));
@@ -222,28 +227,35 @@
// wait until remote dispatch done
try {
asyncPlanNodeSender.waitUntilCompleted();
-
- if (asyncPlanNodeSender.needRetry()) {
+ final long maxRetryDurationInNs =
+ COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() > 0
+ ? COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() * 1_000_000L
+ : 0;
+ if (maxRetryDurationInNs > 0 && asyncPlanNodeSender.needRetry()) {
// retry failed remote FIs
- int retry = 0;
- final int maxRetryTimes = 10;
- long waitMillis = getRetrySleepTime(retry);
+ int retryCount = 0;
+ long waitMillis = getRetrySleepTime(retryCount);
+ long retryStartTime = System.nanoTime();
while (asyncPlanNodeSender.needRetry()) {
- retry++;
+ retryCount++;
asyncPlanNodeSender.retry();
- if (!(asyncPlanNodeSender.needRetry() && retry < maxRetryTimes)) {
+ // if !(still need retry and current time + next sleep time < maxRetryDurationInNs)
+ if (!(asyncPlanNodeSender.needRetry()
+ && (System.nanoTime() - retryStartTime + waitMillis * 1_000_000L)
+ < maxRetryDurationInNs)) {
break;
}
// still need to retry, sleep some time before make another retry.
Thread.sleep(waitMillis);
- waitMillis = getRetrySleepTime(retry);
+ PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis * 1_000_000L);
+ waitMillis = getRetrySleepTime(retryCount);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.error("Interrupted when dispatching write async", e);
+ LOGGER.error("Interrupted when dispatching write async", e);
return immediateFuture(
new FragInstanceDispatchResult(
RpcUtils.getStatus(
@@ -308,7 +320,7 @@
TSendFragmentInstanceResp sendFragmentInstanceResp =
client.sendFragmentInstance(sendFragmentInstanceReq);
if (!sendFragmentInstanceResp.accepted) {
- logger.warn(sendFragmentInstanceResp.message);
+ LOGGER.warn(sendFragmentInstanceResp.message);
if (sendFragmentInstanceResp.isSetNeedRetry()
&& sendFragmentInstanceResp.isNeedRetry()) {
throw new RatisReadUnavailableException(sendFragmentInstanceResp.message);
@@ -330,7 +342,7 @@
TSendSinglePlanNodeResp sendPlanNodeResp =
client.sendBatchPlanNode(sendPlanNodeReq).getResponses().get(0);
if (!sendPlanNodeResp.accepted) {
- logger.warn(
+ LOGGER.warn(
"dispatch write failed. status: {}, code: {}, message: {}, node {}",
sendPlanNodeResp.status,
TSStatusCode.representOf(sendPlanNodeResp.status.code),
@@ -366,7 +378,7 @@
try {
dispatchRemoteHelper(instance, endPoint);
} catch (ClientManagerException | TException | RatisReadUnavailableException e) {
- logger.warn(
+ LOGGER.warn(
"can't execute request on node {}, error msg is {}, and we try to reconnect this node.",
endPoint,
ExceptionUtils.getRootCause(e).toString());
@@ -374,7 +386,7 @@
try {
dispatchRemoteHelper(instance, endPoint);
} catch (ClientManagerException | TException | RatisReadUnavailableException e1) {
- logger.warn(
+ LOGGER.warn(
"can't execute request on node {} in second try, error msg is {}.",
endPoint,
ExceptionUtils.getRootCause(e1).toString());
@@ -398,7 +410,7 @@
ConsensusGroupId.Factory.createFromTConsensusGroupId(
instance.getRegionReplicaSet().getRegionId());
} catch (Throwable t) {
- logger.warn("Deserialize ConsensusGroupId failed. ", t);
+ LOGGER.warn("Deserialize ConsensusGroupId failed. ", t);
throw new FragmentInstanceDispatchException(
RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR,
@@ -414,7 +426,7 @@
? readExecutor.execute(instance)
: readExecutor.execute(groupId, instance);
if (!readResult.isAccepted()) {
- logger.warn(readResult.getMessage());
+ LOGGER.warn(readResult.getMessage());
throw new FragmentInstanceDispatchException(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, readResult.getMessage()));
}
@@ -426,7 +438,7 @@
if (!writeResult.isAccepted()) {
// DO NOT LOG READ_ONLY ERROR
if (writeResult.getStatus().getCode() != TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()) {
- logger.warn(
+ LOGGER.warn(
"write locally failed. TSStatus: {}, message: {}",
writeResult.getStatus(),
writeResult.getMessage());
diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
index b92142e..958765e 100644
--- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
+++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties
@@ -1849,3 +1849,21 @@
# Default value is -1, which means no limit.
# Datatype: int
# load_write_throughput_bytes_per_second=-1
+
+####################
+### Dispatch Retry Configuration
+####################
+
+# The maximum retrying time for write request remotely dispatching, time unit is milliseconds.
+# It only takes effect for write request remotely dispatching, not including locally dispatching and query
+# Set to 0 or negative number to disable remote dispatching write request retrying
+# We will sleep for some time between each retry, 100ms, 200ms, 400ms, 800ms and so on, util reaching 20,000ms, we won't increase the sleeping time any more
+# effectiveMode: hot_reload
+# Datatype: long
+# write_request_remote_dispatch_max_retry_duration_in_ms=60000
+
+# Whether retrying for unknown errors.
+# Current unknown errors includes EXECUTE_STATEMENT_ERROR(301) and INTERNAL_SERVER_ERROR(305)
+# effectiveMode: hot_reload
+# Datatype: boolean
+# enable_retry_for_unknown_error=false
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b78f7dc..1de7f33 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -280,6 +280,10 @@
private final Set<String> enabledKillPoints =
KillPoint.parseKillPoints(System.getProperty(IoTDBConstant.INTEGRATION_TEST_KILL_POINTS));
+ private volatile boolean retryForUnknownErrors = false;
+
+ private volatile long remoteWriteMaxRetryDurationInMs = 60000;
+
CommonConfig() {
// Empty constructor
}
@@ -1210,4 +1214,20 @@
public Set<String> getEnabledKillPoints() {
return enabledKillPoints;
}
+
+ public boolean isRetryForUnknownErrors() {
+ return retryForUnknownErrors;
+ }
+
+ public void setRetryForUnknownErrors(boolean retryForUnknownErrors) {
+ this.retryForUnknownErrors = retryForUnknownErrors;
+ }
+
+ public long getRemoteWriteMaxRetryDurationInMs() {
+ return remoteWriteMaxRetryDurationInMs;
+ }
+
+ public void setRemoteWriteMaxRetryDurationInMs(long remoteWriteMaxRetryDurationInMs) {
+ this.remoteWriteMaxRetryDurationInMs = remoteWriteMaxRetryDurationInMs;
+ }
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 5979a6f..3d42596 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -246,6 +246,8 @@
properties.getProperty(
"cluster_device_limit_threshold",
String.valueOf(config.getDeviceLimitThreshold()))));
+
+ loadRetryProperties(properties);
}
private void loadPipeProps(Properties properties) {
@@ -614,6 +616,20 @@
String.valueOf(config.getSubscriptionReadFileBufferSize()))));
}
+ public void loadRetryProperties(Properties properties) {
+ config.setRemoteWriteMaxRetryDurationInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "write_request_remote_dispatch_max_retry_duration_in_ms",
+ String.valueOf(config.getRemoteWriteMaxRetryDurationInMs()))));
+
+ config.setRetryForUnknownErrors(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_retry_for_unknown_error",
+ String.valueOf(config.isRetryForUnknownErrors()))));
+ }
+
public void loadGlobalConfig(TGlobalConfig globalConfig) {
config.setTimestampPrecision(globalConfig.timestampPrecision);
config.setTimePartitionInterval(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java
index 1efbe62..14e3958 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/PerformanceOverviewMetrics.java
@@ -111,6 +111,7 @@
Metric.PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL.toString();
private static final String LOCAL_SCHEDULE = "local_scheduler";
private static final String REMOTE_SCHEDULE = "remote_scheduler";
+ private static final String REMOTE_RETRY_SLEEP = "remote_retry";
static {
metricInfoMap.put(
@@ -127,10 +128,18 @@
PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL,
Tag.STAGE.toString(),
REMOTE_SCHEDULE));
+ metricInfoMap.put(
+ REMOTE_RETRY_SLEEP,
+ new MetricInfo(
+ MetricType.TIMER,
+ PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL,
+ Tag.STAGE.toString(),
+ REMOTE_RETRY_SLEEP));
}
private Timer localScheduleTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer remoteScheduleTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer remoteRetrySleepTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
/** Record the time cost of local schedule. */
public void recordScheduleLocalCost(long costTimeInNanos) {
@@ -142,6 +151,11 @@
remoteScheduleTimer.updateNanos(costTimeInNanos);
}
+ /** Record the time cost of remote schedule. */
+ public void recordRemoteRetrySleepCost(long costTimeInNanos) {
+ remoteRetrySleepTimer.updateNanos(costTimeInNanos);
+ }
+
// endregion
// region local schedule
@@ -327,6 +341,13 @@
MetricLevel.CORE,
Tag.STAGE.toString(),
REMOTE_SCHEDULE);
+ remoteRetrySleepTimer =
+ metricService.getOrCreateTimer(
+ PERFORMANCE_OVERVIEW_SCHEDULE_DETAIL,
+ MetricLevel.CORE,
+ Tag.STAGE.toString(),
+ REMOTE_RETRY_SLEEP);
+
// bind local schedule metrics
schemaValidateTimer =
metricService.getOrCreateTimer(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
index be32bf5..97811bc 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
@@ -40,11 +40,16 @@
private static final Set<Integer> NEED_RETRY = new HashSet<>();
+ private static final Set<Integer> UNKNOWN_ERRORS = new HashSet<>();
+
private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
static {
- NEED_RETRY.add(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
- NEED_RETRY.add(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ // UNKNOWN ERRORS
+ UNKNOWN_ERRORS.add(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ UNKNOWN_ERRORS.add(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+
+ // KNOWN ERRORS
NEED_RETRY.add(TSStatusCode.DISPATCH_ERROR.getStatusCode());
NEED_RETRY.add(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
NEED_RETRY.add(TSStatusCode.STORAGE_ENGINE_NOT_READY.getStatusCode());
@@ -213,14 +218,23 @@
int code = status.getCode();
if (code == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
for (TSStatus subStatus : status.subStatus) {
+ // any sub codes for MULTIPLE_ERROR don't need to retry, we won't retry for the whole
+ // request
if (subStatus == null
- || (subStatus.getCode() != OK.code && !NEED_RETRY.contains(subStatus.getCode()))) {
+ || (subStatus.getCode() != OK.code
+ && !needRetryHelperForSingleStatus(subStatus.getCode()))) {
return false;
}
}
return true;
} else {
- return NEED_RETRY.contains(code);
+ return needRetryHelperForSingleStatus(code);
}
}
+
+ // without MULTIPLE_ERROR(302)
+ private static boolean needRetryHelperForSingleStatus(int statusCode) {
+ return NEED_RETRY.contains(statusCode)
+ || (COMMON_CONFIG.isRetryForUnknownErrors() && UNKNOWN_ERRORS.contains(statusCode));
+ }
}