[FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs error condition check cause bounded source can not quit
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 9cf233c..137f420 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -298,7 +298,7 @@
if (t != null) {
throw new FlinkRuntimeException("Failed to initialize partition splits due to ", t);
}
- if (partitionDiscoveryIntervalMs < 0) {
+ if (partitionDiscoveryIntervalMs <= 0) {
LOG.debug("Partition discovery is disabled.");
noMoreNewPartitionSplits = true;
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index 8c39b54..8d0d3fc 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -98,6 +98,10 @@
assertThat(context.getOneTimeCallables())
.as("A one time partition discovery callable should have been scheduled")
.hasSize(1);
+
+ // enumerator just start noMoreNewPartitionSplits will be false
+ assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits"))
+ .isFalse();
}
}
@@ -115,6 +119,10 @@
assertThat(context.getPeriodicCallables())
.as("A periodic partition discovery callable should have been scheduled")
.hasSize(1);
+
+ // enumerator just start noMoreNewPartitionSplits will be false
+ assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits"))
+ .isFalse();
}
}
@@ -166,6 +174,78 @@
}
}
+ @Test
+ public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws Throwable {
+ try (MockSplitEnumeratorContext<KafkaPartitionSplit> context =
+ new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+ KafkaSourceEnumerator enumerator =
+ createEnumerator(context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+ // Start the enumerator and it should schedule a one time task to discover and assign
+ // partitions.
+ enumerator.start();
+ assertThat(context.getOneTimeCallables())
+ .as("A one time partition discovery callable should have been scheduled")
+ .hasSize(1);
+ assertThat(context.getPeriodicCallables()).isEmpty();
+ // Run the partition discover callable and check the partition assignment.
+ runOneTimePartitionDiscovery(context);
+
+ // enumerator noMoreNewPartitionSplits first will be false, when execute
+ // handlePartitionSplitChanges will be set true
+ assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits"))
+ .isTrue();
+ }
+ }
+
+ @Test
+ public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() throws Throwable {
+ try (MockSplitEnumeratorContext<KafkaPartitionSplit> context =
+ new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+ KafkaSourceEnumerator enumerator =
+ createEnumerator(context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+
+ // Start the enumerator and it should schedule a one time task to discover and assign
+ // partitions.
+ enumerator.start();
+ assertThat(context.getOneTimeCallables()).isEmpty();
+ assertThat(context.getPeriodicCallables())
+ .as("A periodic partition discovery callable should have been scheduled")
+ .hasSize(1);
+ // Run the partition discover callable and check the partition assignment.
+ runPeriodicPartitionDiscovery(context);
+
+ // enumerator noMoreNewPartitionSplits first will be false, even when execute
+ // handlePartitionSplitChanges it still be false
+ assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits"))
+ .isFalse();
+ }
+ }
+
+ @Test
+ public void testRunWithDiscoverPartitionsOnceWithZeroMsToCheckNoMoreSplit() throws Throwable {
+ try (MockSplitEnumeratorContext<KafkaPartitionSplit> context =
+ new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+ // set partitionDiscoveryIntervalMs = 0
+ KafkaSourceEnumerator enumerator = createEnumerator(context, 0L)) {
+
+ // Start the enumerator, and it should schedule a one time task to discover and assign
+ // partitions.
+ enumerator.start();
+ assertThat(context.getOneTimeCallables())
+ .as("A one time partition discovery callable should have been scheduled")
+ .hasSize(1);
+ assertThat(context.getPeriodicCallables()).isEmpty();
+ // Run the partition discover callable and check the partition assignment.
+ runOneTimePartitionDiscovery(context);
+
+ // enumerator noMoreNewPartitionSplits first will be false, when execute
+ // handlePartitionSplitChanges will be set true
+ assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits"))
+ .isTrue();
+ }
+ }
+
@Test(timeout = 30000L)
public void testDiscoverPartitionsPeriodically() throws Throwable {
try (MockSplitEnumeratorContext<KafkaPartitionSplit> context =
@@ -261,7 +341,7 @@
KafkaSourceEnumerator enumerator =
createEnumerator(
context2,
- ENABLE_PERIODIC_PARTITION_DISCOVERY,
+ ENABLE_PERIODIC_PARTITION_DISCOVERY ? 1 : -1,
PRE_EXISTING_TOPICS,
preexistingAssignments,
new Properties())) {
@@ -290,7 +370,7 @@
KafkaSourceEnumerator enumerator =
createEnumerator(
context,
- ENABLE_PERIODIC_PARTITION_DISCOVERY,
+ ENABLE_PERIODIC_PARTITION_DISCOVERY ? 1 : -1,
PRE_EXISTING_TOPICS,
Collections.emptySet(),
properties)) {
@@ -405,6 +485,12 @@
private KafkaSourceEnumerator createEnumerator(
MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext,
+ long partitionDiscoveryIntervalMs) {
+ return createEnumerator(enumContext, partitionDiscoveryIntervalMs, EXCLUDE_DYNAMIC_TOPIC);
+ }
+
+ private KafkaSourceEnumerator createEnumerator(
+ MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext,
boolean enablePeriodicPartitionDiscovery,
boolean includeDynamicTopic) {
List<String> topics = new ArrayList<>(PRE_EXISTING_TOPICS);
@@ -413,7 +499,23 @@
}
return createEnumerator(
enumContext,
- enablePeriodicPartitionDiscovery,
+ enablePeriodicPartitionDiscovery ? 1 : -1,
+ topics,
+ Collections.emptySet(),
+ new Properties());
+ }
+
+ private KafkaSourceEnumerator createEnumerator(
+ MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext,
+ long partitionDiscoveryIntervalMs,
+ boolean includeDynamicTopic) {
+ List<String> topics = new ArrayList<>(PRE_EXISTING_TOPICS);
+ if (includeDynamicTopic) {
+ topics.add(DYNAMIC_TOPIC_NAME);
+ }
+ return createEnumerator(
+ enumContext,
+ partitionDiscoveryIntervalMs,
topics,
Collections.emptySet(),
new Properties());
@@ -425,7 +527,7 @@
*/
private KafkaSourceEnumerator createEnumerator(
MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext,
- boolean enablePeriodicPartitionDiscovery,
+ long partitionDiscoveryIntervalMs,
Collection<String> topicsToSubscribe,
Set<TopicPartition> assignedPartitions,
Properties overrideProperties) {
@@ -442,10 +544,9 @@
Properties props =
new Properties(KafkaSourceTestEnv.getConsumerProperties(StringDeserializer.class));
KafkaSourceEnumerator.deepCopyProperties(overrideProperties, props);
- String partitionDiscoverInterval = enablePeriodicPartitionDiscovery ? "1" : "-1";
props.setProperty(
KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
- partitionDiscoverInterval);
+ String.valueOf(partitionDiscoveryIntervalMs));
return new KafkaSourceEnumerator(
subscriber,