[FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs error condition check cause bounded source can not quit
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 9cf233c..137f420 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -298,7 +298,7 @@
         if (t != null) {
             throw new FlinkRuntimeException("Failed to initialize partition splits due to ", t);
         }
-        if (partitionDiscoveryIntervalMs < 0) {
+        if (partitionDiscoveryIntervalMs <= 0) {
             LOG.debug("Partition discovery is disabled.");
             noMoreNewPartitionSplits = true;
         }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index 8c39b54..8d0d3fc 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -98,6 +98,10 @@
             assertThat(context.getOneTimeCallables())
                     .as("A one time partition discovery callable should have been scheduled")
                     .hasSize(1);
+
+            // enumerator just start noMoreNewPartitionSplits will be false
+            assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits"))
+                    .isFalse();
         }
     }
 
@@ -115,6 +119,10 @@
             assertThat(context.getPeriodicCallables())
                     .as("A periodic partition discovery callable should have been scheduled")
                     .hasSize(1);
+
+            // enumerator just start noMoreNewPartitionSplits will be false
+            assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits"))
+                    .isFalse();
         }
     }
 
@@ -166,6 +174,78 @@
         }
     }
 
+    @Test
+    public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws Throwable {
+        try (MockSplitEnumeratorContext<KafkaPartitionSplit> context =
+                        new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+                KafkaSourceEnumerator enumerator =
+                        createEnumerator(context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+            // Start the enumerator and it should schedule a one time task to discover and assign
+            // partitions.
+            enumerator.start();
+            assertThat(context.getOneTimeCallables())
+                    .as("A one time partition discovery callable should have been scheduled")
+                    .hasSize(1);
+            assertThat(context.getPeriodicCallables()).isEmpty();
+            // Run the partition discover callable and check the partition assignment.
+            runOneTimePartitionDiscovery(context);
+
+            // enumerator noMoreNewPartitionSplits first will be false, when execute
+            // handlePartitionSplitChanges will be set true
+            assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits"))
+                    .isTrue();
+        }
+    }
+
+    @Test
+    public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() throws Throwable {
+        try (MockSplitEnumeratorContext<KafkaPartitionSplit> context =
+                        new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+                KafkaSourceEnumerator enumerator =
+                        createEnumerator(context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+            // Start the enumerator and it should schedule a one time task to discover and assign
+            // partitions.
+            enumerator.start();
+            assertThat(context.getOneTimeCallables()).isEmpty();
+            assertThat(context.getPeriodicCallables())
+                    .as("A periodic partition discovery callable should have been scheduled")
+                    .hasSize(1);
+            // Run the partition discover callable and check the partition assignment.
+            runPeriodicPartitionDiscovery(context);
+
+            // enumerator noMoreNewPartitionSplits first will be false, even when execute
+            // handlePartitionSplitChanges it still be false
+            assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits"))
+                    .isFalse();
+        }
+    }
+
+    @Test
+    public void testRunWithDiscoverPartitionsOnceWithZeroMsToCheckNoMoreSplit() throws Throwable {
+        try (MockSplitEnumeratorContext<KafkaPartitionSplit> context =
+                        new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+                // set partitionDiscoveryIntervalMs = 0
+                KafkaSourceEnumerator enumerator = createEnumerator(context, 0L)) {
+
+            // Start the enumerator, and it should schedule a one time task to discover and assign
+            // partitions.
+            enumerator.start();
+            assertThat(context.getOneTimeCallables())
+                    .as("A one time partition discovery callable should have been scheduled")
+                    .hasSize(1);
+            assertThat(context.getPeriodicCallables()).isEmpty();
+            // Run the partition discover callable and check the partition assignment.
+            runOneTimePartitionDiscovery(context);
+
+            // enumerator noMoreNewPartitionSplits first will be false, when execute
+            // handlePartitionSplitChanges will be set true
+            assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits"))
+                    .isTrue();
+        }
+    }
+
     @Test(timeout = 30000L)
     public void testDiscoverPartitionsPeriodically() throws Throwable {
         try (MockSplitEnumeratorContext<KafkaPartitionSplit> context =
@@ -261,7 +341,7 @@
                 KafkaSourceEnumerator enumerator =
                         createEnumerator(
                                 context2,
-                                ENABLE_PERIODIC_PARTITION_DISCOVERY,
+                                ENABLE_PERIODIC_PARTITION_DISCOVERY ? 1 : -1,
                                 PRE_EXISTING_TOPICS,
                                 preexistingAssignments,
                                 new Properties())) {
@@ -290,7 +370,7 @@
                 KafkaSourceEnumerator enumerator =
                         createEnumerator(
                                 context,
-                                ENABLE_PERIODIC_PARTITION_DISCOVERY,
+                                ENABLE_PERIODIC_PARTITION_DISCOVERY ? 1 : -1,
                                 PRE_EXISTING_TOPICS,
                                 Collections.emptySet(),
                                 properties)) {
@@ -405,6 +485,12 @@
 
     private KafkaSourceEnumerator createEnumerator(
             MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext,
+            long partitionDiscoveryIntervalMs) {
+        return createEnumerator(enumContext, partitionDiscoveryIntervalMs, EXCLUDE_DYNAMIC_TOPIC);
+    }
+
+    private KafkaSourceEnumerator createEnumerator(
+            MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext,
             boolean enablePeriodicPartitionDiscovery,
             boolean includeDynamicTopic) {
         List<String> topics = new ArrayList<>(PRE_EXISTING_TOPICS);
@@ -413,7 +499,23 @@
         }
         return createEnumerator(
                 enumContext,
-                enablePeriodicPartitionDiscovery,
+                enablePeriodicPartitionDiscovery ? 1 : -1,
+                topics,
+                Collections.emptySet(),
+                new Properties());
+    }
+
+    private KafkaSourceEnumerator createEnumerator(
+            MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext,
+            long partitionDiscoveryIntervalMs,
+            boolean includeDynamicTopic) {
+        List<String> topics = new ArrayList<>(PRE_EXISTING_TOPICS);
+        if (includeDynamicTopic) {
+            topics.add(DYNAMIC_TOPIC_NAME);
+        }
+        return createEnumerator(
+                enumContext,
+                partitionDiscoveryIntervalMs,
                 topics,
                 Collections.emptySet(),
                 new Properties());
@@ -425,7 +527,7 @@
      */
     private KafkaSourceEnumerator createEnumerator(
             MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext,
-            boolean enablePeriodicPartitionDiscovery,
+            long partitionDiscoveryIntervalMs,
             Collection<String> topicsToSubscribe,
             Set<TopicPartition> assignedPartitions,
             Properties overrideProperties) {
@@ -442,10 +544,9 @@
         Properties props =
                 new Properties(KafkaSourceTestEnv.getConsumerProperties(StringDeserializer.class));
         KafkaSourceEnumerator.deepCopyProperties(overrideProperties, props);
-        String partitionDiscoverInterval = enablePeriodicPartitionDiscovery ? "1" : "-1";
         props.setProperty(
                 KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
-                partitionDiscoverInterval);
+                String.valueOf(partitionDiscoveryIntervalMs));
 
         return new KafkaSourceEnumerator(
                 subscriber,