[FLINK-28019][table] fix error when retract a staled record if state ttl enabled in RetractableTopNFunction

This closes #19996
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
index da4d94d..b9d6b6d 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunction.java
@@ -191,16 +191,7 @@
                     sortedMap.put(sortKey, count);
                 }
             } else {
-                if (sortedMap.isEmpty()) {
-                    if (lenient) {
-                        LOG.warn(STATE_CLEARED_WARN_MSG);
-                    } else {
-                        throw new RuntimeException(STATE_CLEARED_WARN_MSG);
-                    }
-                } else {
-                    throw new RuntimeException(
-                            "Can not retract a non-existent record. This should never happen.");
-                }
+                stateStaledErrorHandle();
             }
 
             if (!stateRemoved) {
@@ -231,10 +222,19 @@
 
     private void processStateStaled(Iterator<Map.Entry<RowData, Long>> sortedMapIterator)
             throws RuntimeException {
+        // Sync with dataState first
+        sortedMapIterator.remove();
+
+        stateStaledErrorHandle();
+    }
+
+    /**
+     * Handle state staled error by configured lenient option. If option is true, warning log only,
+     * otherwise a {@link RuntimeException} will be thrown.
+     */
+    private void stateStaledErrorHandle() {
         // Skip the data if it's state is cleared because of state ttl.
         if (lenient) {
-            // Sync with dataState
-            sortedMapIterator.remove();
             LOG.warn(STATE_CLEARED_WARN_MSG);
         } else {
             throw new RuntimeException(STATE_CLEARED_WARN_MSG);
@@ -395,8 +395,12 @@
             }
         }
         if (isInRankEnd(currentRank)) {
-            // there is no enough elements in Top-N, emit DELETE message for the retract record.
-            collectDelete(out, prevRow, currentRank);
+            if (!findsSortKey && null == prevRow) {
+                stateStaledErrorHandle();
+            } else {
+                // there is no enough elements in Top-N, emit DELETE message for the retract record.
+                collectDelete(out, prevRow, currentRank);
+            }
         }
 
         return findsSortKey;
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
index 20efa88..d0c3c7d 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/RetractableTopNFunctionTest.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.table.runtime.operators.rank;
 
+import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.StateConfigUtil;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -555,4 +557,40 @@
         assertorWithRowNumber.assertOutputEquals(
                 "output wrong.", expectedOutput, testHarness.getOutput());
     }
+
+    @Test
+    public void testRetractAnStaledRecordWithRowNumber() throws Exception {
+        StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000);
+        AbstractTopNFunction func =
+                new RetractableTopNFunction(
+                        ttlConfig,
+                        InternalTypeInfo.ofFields(
+                                VarCharType.STRING_TYPE, new BigIntType(), new IntType()),
+                        comparableRecordComparator,
+                        sortKeySelector,
+                        RankType.ROW_NUMBER,
+                        new ConstantRankRange(1, 2),
+                        generatedEqualiser,
+                        true,
+                        true);
+
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func);
+        testHarness.open();
+        testHarness.setStateTtlProcessingTime(0);
+        testHarness.processElement(insertRecord("a", 1L, 10));
+        testHarness.setStateTtlProcessingTime(1001);
+        testHarness.processElement(insertRecord("a", 2L, 11));
+        testHarness.processElement(deleteRecord("a", 1L, 10));
+        testHarness.close();
+
+        List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord("a", 1L, 10, 1L));
+        expectedOutput.add(insertRecord("a", 2L, 11, 1L));
+        // the following delete record should not be sent because the left row is null which is
+        // illegal.
+        // -D{row1=null, row2=+I(1)};
+
+        assertorWithRowNumber.assertOutputEquals(
+                "output wrong.", expectedOutput, testHarness.getOutput());
+    }
 }