Fix improper retries in dispatcher & measurements not set to null when inserting row out-of-ttl (#16297)
diff --git a/.gitignore b/.gitignore index 111f592..a489dd0 100644 --- a/.gitignore +++ b/.gitignore
@@ -123,3 +123,4 @@ # Develocity .mvn/.gradle-enterprise/ .mvn/.develocity/ +.run/
diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java index 88b92db..c95b8cc 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBSessionRelationalIT.java
@@ -27,12 +27,14 @@ import org.apache.iotdb.isession.ITableSession; import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.env.SimpleEnv; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.TableClusterIT; import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.TableSessionBuilder; import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; @@ -1875,4 +1877,40 @@ EnvFactory.getEnv().initClusterEnvironment(); } } + + @Test + public void testSqlInsertWithExpiredTTL() + throws IoTDBConnectionException, StatementExecutionException { + SimpleEnv simpleEnv = new SimpleEnv(); + simpleEnv.getConfig().getCommonConfig().setDataReplicationFactor(2); + simpleEnv + .getConfig() + .getCommonConfig() + .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus"); + simpleEnv.initClusterEnvironment(1, 3); + + try (ITableSession session = simpleEnv.getTableSessionConnection()) { + session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS test"); + session.executeNonQueryStatement("USE test"); + + session.executeNonQueryStatement("CREATE TABLE test_sql_ttl (s1 INT32)"); + session.executeNonQueryStatement("ALTER TABLE test_sql_ttl SET PROPERTIES TTL=1"); + + for (DataNodeWrapper dataNodeWrapper : simpleEnv.getDataNodeWrapperList()) { + TableSessionBuilder tableSessionBuilder = new TableSessionBuilder(); + tableSessionBuilder.nodeUrls( + Collections.singletonList(dataNodeWrapper.getIpAndPortString())); + tableSessionBuilder.database("test"); + try (ITableSession subSession = tableSessionBuilder.build()) { + subSession.executeNonQueryStatement("INSERT INTO test_sql_ttl (time, s1) VALUES (10, 1)"); + } catch (StatementExecutionException e) { + if (!e.getMessage().contains("is less than ttl time bound")) { + throw e; + } + } + } + } finally { + simpleEnv.cleanClusterEnvironment(); + } + } }
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java index d587e25..e190c27 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -62,10 +62,10 @@ @Override public void onComplete(TSyncLogEntriesRes response) { if (response.getStatuses().stream() - .anyMatch(status -> RetryUtils.needRetryForConsensus(status.getCode()))) { + .anyMatch(status -> RetryUtils.needRetryForWrite(status.getCode()))) { List<String> retryStatusMessages = response.getStatuses().stream() - .filter(status -> RetryUtils.needRetryForConsensus(status.getCode())) + .filter(status -> RetryUtils.needRetryForWrite(status.getCode())) .map(TSStatus::getMessage) .collect(Collectors.toList());
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index a4f21d2..8065fb2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.RatisReadUnavailableException; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -66,6 +67,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -284,6 +286,16 @@ dispatchWriteOnce(shouldDispatch); // 3. decide if we need retry (we may decide the retry condition instance-wise, if needed) + Iterator<FailedFragmentInstanceWithStatus> iterator = failedInstances.iterator(); + while (iterator.hasNext()) { + FailedFragmentInstanceWithStatus failedFragmentInstanceWithStatus = iterator.next(); + if (!RetryUtils.needRetryForWrite( + failedFragmentInstanceWithStatus.getFailureStatus().getCode())) { + dispatchFailures.add(failedFragmentInstanceWithStatus.getFailureStatus()); + iterator.remove(); + } + } + final boolean shouldRetry = !failedInstances.isEmpty() && maxRetryDurationInNs > 0 && replicaNum > 1; if (!shouldRetry) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 4bb9bc2b..1997a99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3798,6 +3798,7 @@ DateTimeUtils.convertLongToDate( CommonDateTimeUtils.currentTime() - ttl)))); insertRowNode.setFailedMeasurementNumber(insertRowNode.getMeasurements().length); + insertRowNode.setMeasurements(null); continue; } // init map
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java index 779956b..94325e9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -96,7 +96,7 @@ public void handle( final TSStatus status, final String exceptionMessage, final String recordMessage) { - if (RetryUtils.needRetryForConsensus(status.getCode())) { + if (RetryUtils.needRetryForWrite(status.getCode())) { LOGGER.info("IoTConsensusV2: will retry with increasing interval. status: {}", status); throw new PipeConsensusRetryWithIncreasingIntervalException( exceptionMessage, Integer.MAX_VALUE);
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java index 2bd4b7b..62b6563 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -30,7 +30,7 @@ T call() throws E; } - public static boolean needRetryForConsensus(int statusCode) { + public static boolean needRetryForWrite(int statusCode) { return statusCode == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode() || statusCode == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();