[FLINK-24277][connector/kafka] Add OffsetsInitializerValidator interface for validating offset initializer in KafkaSourceBuilder
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
index eb93683..d105cd8 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
@@ -495,6 +496,13 @@
String.format(
"Property %s is required when offset commit is enabled",
ConsumerConfig.GROUP_ID_CONFIG));
+ // Check offsets initializers
+ if (startingOffsetsInitializer instanceof OffsetsInitializerValidator) {
+ ((OffsetsInitializerValidator) startingOffsetsInitializer).validate(props);
+ }
+ if (stoppingOffsetsInitializer instanceof OffsetsInitializerValidator) {
+ ((OffsetsInitializerValidator) stoppingOffsetsInitializer).validate(props);
+ }
}
private boolean offsetCommitEnabledManually() {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerValidator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerValidator.java
new file mode 100644
index 0000000..c198107
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerValidator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.enumerator.initializer;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Properties;
+
+/**
+ * Interface for validating {@link OffsetsInitializer} with properties from {@link
+ * org.apache.flink.connector.kafka.source.KafkaSource}.
+ */
+@Internal
+public interface OffsetsInitializerValidator {
+
+ /**
+ * Validate offsets initializer with properties of Kafka source.
+ *
+ * @param kafkaSourceProperties Properties of Kafka source
+ * @throws IllegalStateException if validation fails
+ */
+ void validate(Properties kafkaSourceProperties) throws IllegalStateException;
+}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
index 1773d63..026320d 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
@@ -20,12 +20,16 @@
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkState;
/**
* A initializer that initialize the partitions to the earliest / latest / last-committed offsets.
@@ -34,7 +38,7 @@
*
* <p>Package private and should be instantiated via {@link OffsetsInitializer}.
*/
-class ReaderHandledOffsetsInitializer implements OffsetsInitializer {
+class ReaderHandledOffsetsInitializer implements OffsetsInitializer, OffsetsInitializerValidator {
private static final long serialVersionUID = 172938052008787981L;
private final long startingOffset;
private final OffsetResetStrategy offsetResetStrategy;
@@ -65,4 +69,15 @@
public OffsetResetStrategy getAutoOffsetResetStrategy() {
return offsetResetStrategy;
}
+
+ @Override
+ public void validate(Properties kafkaSourceProperties) {
+ if (startingOffset == KafkaPartitionSplit.COMMITTED_OFFSET) {
+ checkState(
+ kafkaSourceProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG),
+ String.format(
+ "Property %s is required when using committed offset for offsets initializer",
+ ConsumerConfig.GROUP_ID_CONFIG));
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
index d3335de..5766a5f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
@@ -18,6 +18,9 @@
package org.apache.flink.connector.kafka.source.enumerator.initializer;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
@@ -27,6 +30,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkState;
/**
* An implementation of {@link OffsetsInitializer} which initializes the offsets of the partition
@@ -34,7 +40,7 @@
*
* <p>Package private and should be instantiated via {@link OffsetsInitializer}.
*/
-class SpecifiedOffsetsInitializer implements OffsetsInitializer {
+class SpecifiedOffsetsInitializer implements OffsetsInitializer, OffsetsInitializerValidator {
private static final long serialVersionUID = 1649702397250402877L;
private final Map<TopicPartition, Long> initialOffsets;
private final OffsetResetStrategy offsetResetStrategy;
@@ -85,4 +91,18 @@
public OffsetResetStrategy getAutoOffsetResetStrategy() {
return offsetResetStrategy;
}
+
+ @Override
+ public void validate(Properties kafkaSourceProperties) {
+ initialOffsets.forEach(
+ (tp, offset) -> {
+ if (offset == KafkaPartitionSplit.COMMITTED_OFFSET) {
+ checkState(
+ kafkaSourceProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG),
+ String.format(
+ "Property %s is required because partition %s is initialized with committed offset",
+ ConsumerConfig.GROUP_ID_CONFIG, tp));
+ }
+ });
+ }
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
index c02232b..af14e66 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
@@ -18,10 +18,13 @@
package org.apache.flink.connector.kafka.source;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.util.TestLogger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
@@ -29,6 +32,9 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+import java.util.Map;
+
/** Tests for {@link KafkaSourceBuilder}. */
public class KafkaSourceBuilderTest extends TestLogger {
@@ -88,7 +94,7 @@
@Test
public void testEnableAutoCommitWithoutGroupId() {
final IllegalStateException exception =
- Assert.assertThrows(
+ Assertions.assertThrows(
IllegalStateException.class,
() ->
getBasicBuilder()
@@ -109,6 +115,53 @@
getBasicBuilder().setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false").build();
}
+ @Test
+ public void testUsingCommittedOffsetsInitializerWithoutGroupId() {
+ // Using OffsetsInitializer#committedOffsets as starting offsets
+ final IllegalStateException startingOffsetException =
+ Assertions.assertThrows(
+ IllegalStateException.class,
+ () ->
+ getBasicBuilder()
+ .setStartingOffsets(OffsetsInitializer.committedOffsets())
+ .build());
+ MatcherAssert.assertThat(
+ startingOffsetException.getMessage(),
+ CoreMatchers.containsString(
+ "Property group.id is required when using committed offset for offsets initializer"));
+
+ // Using OffsetsInitializer#committedOffsets as stopping offsets
+ final IllegalStateException stoppingOffsetException =
+ Assertions.assertThrows(
+ IllegalStateException.class,
+ () ->
+ getBasicBuilder()
+ .setBounded(OffsetsInitializer.committedOffsets())
+ .build());
+ MatcherAssert.assertThat(
+ stoppingOffsetException.getMessage(),
+ CoreMatchers.containsString(
+ "Property group.id is required when using committed offset for offsets initializer"));
+
+ // Using OffsetsInitializer#offsets to manually specify committed offset as starting offset
+ final IllegalStateException specificStartingOffsetException =
+ Assertions.assertThrows(
+ IllegalStateException.class,
+ () -> {
+ final Map<TopicPartition, Long> offsetMap = new HashMap<>();
+ offsetMap.put(
+ new TopicPartition("topic", 0),
+ KafkaPartitionSplit.COMMITTED_OFFSET);
+ getBasicBuilder()
+ .setStartingOffsets(OffsetsInitializer.offsets(offsetMap))
+ .build();
+ });
+ MatcherAssert.assertThat(
+ specificStartingOffsetException.getMessage(),
+ CoreMatchers.containsString(
+ "Property group.id is required because partition topic-0 is initialized with committed offset"));
+ }
+
private KafkaSourceBuilder<String> getBasicBuilder() {
return new KafkaSourceBuilder<String>()
.setBootstrapServers("testServer")