Fix improper retries in dispatcher & measurements not set to null when inserting row out-of-ttl (#16297)

diff --git a/.gitignore b/.gitignore
index 111f592..a489dd0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -123,3 +123,4 @@
 # Develocity
 .mvn/.gradle-enterprise/
 .mvn/.develocity/
+.run/
diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
index 88b92db..c95b8cc 100644
--- a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
@@ -27,12 +27,14 @@
 import org.apache.iotdb.isession.ITableSession;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.TableClusterIT;
 import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.TableSessionBuilder;
 
 import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
@@ -1875,4 +1877,40 @@
       EnvFactory.getEnv().initClusterEnvironment();
     }
   }
+
+  @Test
+  public void testSqlInsertWithExpiredTTL()
+      throws IoTDBConnectionException, StatementExecutionException {
+    SimpleEnv simpleEnv = new SimpleEnv();
+    simpleEnv.getConfig().getCommonConfig().setDataReplicationFactor(2);
+    simpleEnv
+        .getConfig()
+        .getCommonConfig()
+        .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus");
+    simpleEnv.initClusterEnvironment(1, 3);
+
+    try (ITableSession session = simpleEnv.getTableSessionConnection()) {
+      session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS test");
+      session.executeNonQueryStatement("USE test");
+
+      session.executeNonQueryStatement("CREATE TABLE test_sql_ttl (s1 INT32)");
+      session.executeNonQueryStatement("ALTER TABLE test_sql_ttl SET PROPERTIES TTL=1");
+
+      for (DataNodeWrapper dataNodeWrapper : simpleEnv.getDataNodeWrapperList()) {
+        TableSessionBuilder tableSessionBuilder = new TableSessionBuilder();
+        tableSessionBuilder.nodeUrls(
+            Collections.singletonList(dataNodeWrapper.getIpAndPortString()));
+        tableSessionBuilder.database("test");
+        try (ITableSession subSession = tableSessionBuilder.build()) {
+          subSession.executeNonQueryStatement("INSERT INTO test_sql_ttl (time, s1) VALUES (10, 1)");
+        } catch (StatementExecutionException e) {
+          if (!e.getMessage().contains("is less than ttl time bound")) {
+            throw e;
+          }
+        }
+      }
+    } finally {
+      simpleEnv.cleanClusterEnvironment();
+    }
+  }
 }
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index d587e25..e190c27 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -62,10 +62,10 @@
   @Override
   public void onComplete(TSyncLogEntriesRes response) {
     if (response.getStatuses().stream()
-        .anyMatch(status -> RetryUtils.needRetryForConsensus(status.getCode()))) {
+        .anyMatch(status -> RetryUtils.needRetryForWrite(status.getCode()))) {
       List<String> retryStatusMessages =
           response.getStatuses().stream()
-              .filter(status -> RetryUtils.needRetryForConsensus(status.getCode()))
+              .filter(status -> RetryUtils.needRetryForWrite(status.getCode()))
               .map(TSStatus::getMessage)
               .collect(Collectors.toList());
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a4f21d2..8065fb2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -30,6 +30,7 @@
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -66,6 +67,7 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -284,6 +286,16 @@
           dispatchWriteOnce(shouldDispatch);
 
       // 3. decide if we need retry (we may decide the retry condition instance-wise, if needed)
+      Iterator<FailedFragmentInstanceWithStatus> iterator = failedInstances.iterator();
+      while (iterator.hasNext()) {
+        FailedFragmentInstanceWithStatus failedFragmentInstanceWithStatus = iterator.next();
+        if (!RetryUtils.needRetryForWrite(
+            failedFragmentInstanceWithStatus.getFailureStatus().getCode())) {
+          dispatchFailures.add(failedFragmentInstanceWithStatus.getFailureStatus());
+          iterator.remove();
+        }
+      }
+
       final boolean shouldRetry =
           !failedInstances.isEmpty() && maxRetryDurationInNs > 0 && replicaNum > 1;
       if (!shouldRetry) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 4bb9bc2b..1997a99 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3798,6 +3798,7 @@
                           DateTimeUtils.convertLongToDate(
                               CommonDateTimeUtils.currentTime() - ttl))));
           insertRowNode.setFailedMeasurementNumber(insertRowNode.getMeasurements().length);
+          insertRowNode.setMeasurements(null);
           continue;
         }
         // init map
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 779956b..94325e9 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -96,7 +96,7 @@
   public void handle(
       final TSStatus status, final String exceptionMessage, final String recordMessage) {
 
-    if (RetryUtils.needRetryForConsensus(status.getCode())) {
+    if (RetryUtils.needRetryForWrite(status.getCode())) {
       LOGGER.info("IoTConsensusV2: will retry with increasing interval. status: {}", status);
       throw new PipeConsensusRetryWithIncreasingIntervalException(
           exceptionMessage, Integer.MAX_VALUE);
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
index 2bd4b7b..62b6563 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -30,7 +30,7 @@
     T call() throws E;
   }
 
-  public static boolean needRetryForConsensus(int statusCode) {
+  public static boolean needRetryForWrite(int statusCode) {
     return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()
         || statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
         || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();