[FLINK-28019][table] fix error when retract a staled record if state ttl enabled in RetractableTopNFunction
This closes #19996
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
index da4d94d..b9d6b6d 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
@@ -191,16 +191,7 @@
sortedMap.put(sortKey, count);
}
} else {
- if (sortedMap.isEmpty()) {
- if (lenient) {
- LOG.warn(STATE_CLEARED_WARN_MSG);
- } else {
- throw new RuntimeException(STATE_CLEARED_WARN_MSG);
- }
- } else {
- throw new RuntimeException(
- "Can not retract a non-existent record. This should never happen.");
- }
+ stateStaledErrorHandle();
}
if (!stateRemoved) {
@@ -231,10 +222,19 @@
private void processStateStaled(Iterator<Map.Entry<RowData, Long>> sortedMapIterator)
throws RuntimeException {
+ // Sync with dataState first
+ sortedMapIterator.remove();
+
+ stateStaledErrorHandle();
+ }
+
+ /**
+ * Handle state staled error by configured lenient option. If option is true, warning log only,
+ * otherwise a {@link RuntimeException} will be thrown.
+ */
+ private void stateStaledErrorHandle() {
// Skip the data if it's state is cleared because of state ttl.
if (lenient) {
- // Sync with dataState
- sortedMapIterator.remove();
LOG.warn(STATE_CLEARED_WARN_MSG);
} else {
throw new RuntimeException(STATE_CLEARED_WARN_MSG);
@@ -395,8 +395,12 @@
}
}
if (isInRankEnd(currentRank)) {
- // there is no enough elements in Top-N, emit DELETE message for the retract record.
- collectDelete(out, prevRow, currentRank);
+ if (!findsSortKey && null == prevRow) {
+ stateStaledErrorHandle();
+ } else {
+ // there is no enough elements in Top-N, emit DELETE message for the retract record.
+ collectDelete(out, prevRow, currentRank);
+ }
}
return findsSortKey;
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
index 20efa88..d0c3c7d 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
@@ -18,10 +18,12 @@
package org.apache.flink.table.runtime.operators.rank;
+import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -555,4 +557,40 @@
assertorWithRowNumber.assertOutputEquals(
"output wrong.", expectedOutput, testHarness.getOutput());
}
+
+ @Test
+ public void testRetractAnStaledRecordWithRowNumber() throws Exception {
+ StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000);
+ AbstractTopNFunction func =
+ new RetractableTopNFunction(
+ ttlConfig,
+ InternalTypeInfo.ofFields(
+ VarCharType.STRING_TYPE, new BigIntType(), new IntType()),
+ comparableRecordComparator,
+ sortKeySelector,
+ RankType.ROW_NUMBER,
+ new ConstantRankRange(1, 2),
+ generatedEqualiser,
+ true,
+ true);
+
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
+ testHarness.open();
+ testHarness.setStateTtlProcessingTime(0);
+ testHarness.processElement(insertRecord("a", 1L, 10));
+ testHarness.setStateTtlProcessingTime(1001);
+ testHarness.processElement(insertRecord("a", 2L, 11));
+ testHarness.processElement(deleteRecord("a", 1L, 10));
+ testHarness.close();
+
+ List<Object> expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord("a", 1L, 10, 1L));
+ expectedOutput.add(insertRecord("a", 2L, 11, 1L));
+ // the following delete record should not be sent because the left row is null which is
+ // illegal.
+ // -D{row1=null, row2=+I(1)};
+
+ assertorWithRowNumber.assertOutputEquals(
+ "output wrong.", expectedOutput, testHarness.getOutput());
+ }
}