[FLINK-24277][connector/kafka] Add OffsetsInitializerValidator interface for validating offset initializer in KafkaSourceBuilder
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
index eb93683..d105cd8 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
 import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
 import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 
@@ -495,6 +496,13 @@
                 String.format(
                         "Property %s is required when offset commit is enabled",
                         ConsumerConfig.GROUP_ID_CONFIG));
+        // Check offsets initializers
+        if (startingOffsetsInitializer instanceof OffsetsInitializerValidator) {
+            ((OffsetsInitializerValidator) startingOffsetsInitializer).validate(props);
+        }
+        if (stoppingOffsetsInitializer instanceof OffsetsInitializerValidator) {
+            ((OffsetsInitializerValidator) stoppingOffsetsInitializer).validate(props);
+        }
     }
 
     private boolean offsetCommitEnabledManually() {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerValidator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerValidator.java
new file mode 100644
index 0000000..c198107
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerValidator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.enumerator.initializer;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Properties;
+
+/**
+ * Interface for validating {@link OffsetsInitializer} with properties from {@link
+ * org.apache.flink.connector.kafka.source.KafkaSource}.
+ */
+@Internal
+public interface OffsetsInitializerValidator {
+
+    /**
+     * Validate offsets initializer with properties of Kafka source.
+     *
+     * @param kafkaSourceProperties Properties of Kafka source
+     * @throws IllegalStateException if validation fails
+     */
+    void validate(Properties kafkaSourceProperties) throws IllegalStateException;
+}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
index 1773d63..026320d 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
@@ -20,12 +20,16 @@
 
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A initializer that initialize the partitions to the earliest / latest / last-committed offsets.
@@ -34,7 +38,7 @@
  *
  * <p>Package private and should be instantiated via {@link OffsetsInitializer}.
  */
-class ReaderHandledOffsetsInitializer implements OffsetsInitializer {
+class ReaderHandledOffsetsInitializer implements OffsetsInitializer, OffsetsInitializerValidator {
     private static final long serialVersionUID = 172938052008787981L;
     private final long startingOffset;
     private final OffsetResetStrategy offsetResetStrategy;
@@ -65,4 +69,15 @@
     public OffsetResetStrategy getAutoOffsetResetStrategy() {
         return offsetResetStrategy;
     }
+
+    @Override
+    public void validate(Properties kafkaSourceProperties) {
+        if (startingOffset == KafkaPartitionSplit.COMMITTED_OFFSET) {
+            checkState(
+                    kafkaSourceProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG),
+                    String.format(
+                            "Property %s is required when using committed offset for offsets initializer",
+                            ConsumerConfig.GROUP_ID_CONFIG));
+        }
+    }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
index d3335de..5766a5f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.connector.kafka.source.enumerator.initializer;
 
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 
@@ -27,6 +30,9 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * An implementation of {@link OffsetsInitializer} which initializes the offsets of the partition
@@ -34,7 +40,7 @@
  *
  * <p>Package private and should be instantiated via {@link OffsetsInitializer}.
  */
-class SpecifiedOffsetsInitializer implements OffsetsInitializer {
+class SpecifiedOffsetsInitializer implements OffsetsInitializer, OffsetsInitializerValidator {
     private static final long serialVersionUID = 1649702397250402877L;
     private final Map<TopicPartition, Long> initialOffsets;
     private final OffsetResetStrategy offsetResetStrategy;
@@ -85,4 +91,18 @@
     public OffsetResetStrategy getAutoOffsetResetStrategy() {
         return offsetResetStrategy;
     }
+
+    @Override
+    public void validate(Properties kafkaSourceProperties) {
+        initialOffsets.forEach(
+                (tp, offset) -> {
+                    if (offset == KafkaPartitionSplit.COMMITTED_OFFSET) {
+                        checkState(
+                                kafkaSourceProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG),
+                                String.format(
+                                        "Property %s is required because partition %s is initialized with committed offset",
+                                        ConsumerConfig.GROUP_ID_CONFIG, tp));
+                    }
+                });
+    }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
index c02232b..af14e66 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
@@ -18,10 +18,13 @@
 package org.apache.flink.connector.kafka.source;
 
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.MatcherAssert;
@@ -29,6 +32,9 @@
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /** Tests for {@link KafkaSourceBuilder}. */
 public class KafkaSourceBuilderTest extends TestLogger {
 
@@ -88,7 +94,7 @@
     @Test
     public void testEnableAutoCommitWithoutGroupId() {
         final IllegalStateException exception =
-                Assert.assertThrows(
+                Assertions.assertThrows(
                         IllegalStateException.class,
                         () ->
                                 getBasicBuilder()
@@ -109,6 +115,53 @@
         getBasicBuilder().setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false").build();
     }
 
+    @Test
+    public void testUsingCommittedOffsetsInitializerWithoutGroupId() {
+        // Using OffsetsInitializer#committedOffsets as starting offsets
+        final IllegalStateException startingOffsetException =
+                Assertions.assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                getBasicBuilder()
+                                        .setStartingOffsets(OffsetsInitializer.committedOffsets())
+                                        .build());
+        MatcherAssert.assertThat(
+                startingOffsetException.getMessage(),
+                CoreMatchers.containsString(
+                        "Property group.id is required when using committed offset for offsets initializer"));
+
+        // Using OffsetsInitializer#committedOffsets as stopping offsets
+        final IllegalStateException stoppingOffsetException =
+                Assertions.assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                getBasicBuilder()
+                                        .setBounded(OffsetsInitializer.committedOffsets())
+                                        .build());
+        MatcherAssert.assertThat(
+                stoppingOffsetException.getMessage(),
+                CoreMatchers.containsString(
+                        "Property group.id is required when using committed offset for offsets initializer"));
+
+        // Using OffsetsInitializer#offsets to manually specify committed offset as starting offset
+        final IllegalStateException specificStartingOffsetException =
+                Assertions.assertThrows(
+                        IllegalStateException.class,
+                        () -> {
+                            final Map<TopicPartition, Long> offsetMap = new HashMap<>();
+                            offsetMap.put(
+                                    new TopicPartition("topic", 0),
+                                    KafkaPartitionSplit.COMMITTED_OFFSET);
+                            getBasicBuilder()
+                                    .setStartingOffsets(OffsetsInitializer.offsets(offsetMap))
+                                    .build();
+                        });
+        MatcherAssert.assertThat(
+                specificStartingOffsetException.getMessage(),
+                CoreMatchers.containsString(
+                        "Property group.id is required because partition topic-0 is initialized with committed offset"));
+    }
+
     private KafkaSourceBuilder<String> getBasicBuilder() {
         return new KafkaSourceBuilder<String>()
                 .setBootstrapServers("testServer")