[FLINK-24277][connector/kafka] Add configuration for committing offset on checkpoint and disable it if group ID is not specified
diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md
index a94d7bd..b614c10 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -154,6 +154,7 @@
   below for more details.
 - ```register.consumer.metrics``` specifies whether to register metrics of KafkaConsumer in Flink
 metric group
+- ```commit.offsets.on.checkpoint``` specifies whether to commit consuming offsets to Kafka brokers on checkpoint
 
 For configurations of KafkaConsumer, you can refer to
 <a href="http://kafka.apache.org/documentation/#consumerconfigs">Apache Kafka documentation</a>
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index a5d89b9..9a05089 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -214,4 +214,9 @@
         props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty(key)));
         return config;
     }
+
+    @VisibleForTesting
+    Configuration getConfiguration() {
+        return toConfiguration(props);
+    }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
index cd286ed..eb93683 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
@@ -41,6 +41,7 @@
 import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The @builder class for {@link KafkaSource} to make it easier for the users to construct a {@link
@@ -429,8 +430,12 @@
                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                 ByteArrayDeserializer.class.getName(),
                 true);
-        maybeOverride(
-                ConsumerConfig.GROUP_ID_CONFIG, "KafkaSource-" + new Random().nextLong(), false);
+        if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+            LOG.warn(
+                    "Offset commit on checkpoint is disabled because {} is not specified",
+                    ConsumerConfig.GROUP_ID_CONFIG);
+            maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false);
+        }
         maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false);
         maybeOverride(
                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
@@ -443,10 +448,13 @@
                 "-1",
                 boundedness == Boundedness.BOUNDED);
 
-        // If the client id prefix is not set, reuse the consumer group id as the client id prefix.
+        // If the client id prefix is not set, reuse the consumer group id as the client id prefix,
+        // or generate a random string if consumer group id is not specified.
         maybeOverride(
                 KafkaSourceOptions.CLIENT_ID_PREFIX.key(),
-                props.getProperty(ConsumerConfig.GROUP_ID_CONFIG),
+                props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)
+                        ? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG)
+                        : "KafkaSource-" + new Random().nextLong(),
                 false);
     }
 
@@ -481,5 +489,24 @@
                 "No subscribe mode is specified, "
                         + "should be one of topics, topic pattern and partition set.");
         checkNotNull(deserializationSchema, "Deserialization schema is required but not provided.");
+        // Check consumer group ID
+        checkState(
+                props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || !offsetCommitEnabledManually(),
+                String.format(
+                        "Property %s is required when offset commit is enabled",
+                        ConsumerConfig.GROUP_ID_CONFIG));
+    }
+
+    private boolean offsetCommitEnabledManually() {
+        boolean autoCommit =
+                props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                        && Boolean.parseBoolean(
+                                props.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+        boolean commitOnCheckpoint =
+                props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())
+                        && Boolean.parseBoolean(
+                                props.getProperty(
+                                        KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()));
+        return autoCommit || commitOnCheckpoint;
     }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
index e48804b..1a05833 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
@@ -48,6 +48,12 @@
                     .withDescription(
                             "Whether to register metrics of KafkaConsumer into Flink metric group");
 
+    public static final ConfigOption<Boolean> COMMIT_OFFSETS_ON_CHECKPOINT =
+            ConfigOptions.key("commit.offsets.on.checkpoint")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether to commit consuming offset on checkpoint.");
+
     @SuppressWarnings("unchecked")
     public static <T> T getOption(
             Properties props, ConfigOption<?> configOption, Function<String, T> parser) {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index 2bd2c31..d048230 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -353,17 +353,19 @@
             Set<TopicPartition> partitionsStoppingAtCommitted) {
         Map<TopicPartition, Long> endOffset = consumer.endOffsets(partitionsStoppingAtLatest);
         stoppingOffsets.putAll(endOffset);
-        consumer.committed(partitionsStoppingAtCommitted)
-                .forEach(
-                        (tp, offsetAndMetadata) -> {
-                            Preconditions.checkNotNull(
-                                    offsetAndMetadata,
-                                    String.format(
-                                            "Partition %s should stop at committed offset. "
-                                                    + "But there is no committed offset of this partition for group %s",
-                                            tp, groupId));
-                            stoppingOffsets.put(tp, offsetAndMetadata.offset());
-                        });
+        if (!partitionsStoppingAtCommitted.isEmpty()) {
+            consumer.committed(partitionsStoppingAtCommitted)
+                    .forEach(
+                            (tp, offsetAndMetadata) -> {
+                                Preconditions.checkNotNull(
+                                        offsetAndMetadata,
+                                        String.format(
+                                                "Partition %s should stop at committed offset. "
+                                                        + "But there is no committed offset of this partition for group %s",
+                                                tp, groupId));
+                                stoppingOffsets.put(tp, offsetAndMetadata.offset());
+                            });
+        }
     }
 
     private void removeEmptySplits() {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
index 79c8064..08f82b0 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
@@ -26,6 +26,7 @@
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
 import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
@@ -55,6 +56,7 @@
     private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit;
     private final ConcurrentMap<TopicPartition, OffsetAndMetadata> offsetsOfFinishedSplits;
     private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
+    private final boolean commitOffsetsOnCheckpoint;
 
     public KafkaSourceReader(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue,
@@ -67,6 +69,13 @@
         this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
         this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
         this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
+        this.commitOffsetsOnCheckpoint =
+                config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
+        if (!commitOffsetsOnCheckpoint) {
+            LOG.warn(
+                    "Offset commit on checkpoint is disabled. "
+                            + "Consuming offset will not be reported back to Kafka cluster.");
+        }
     }
 
     @Override
@@ -84,6 +93,10 @@
     @Override
     public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
         List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);
+        if (!commitOffsetsOnCheckpoint) {
+            return splits;
+        }
+
         if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
             offsetsToCommit.put(checkpointId, Collections.emptyMap());
         } else {
@@ -108,6 +121,10 @@
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         LOG.debug("Committing offsets for checkpoint {}", checkpointId);
+        if (!commitOffsetsOnCheckpoint) {
+            return;
+        }
+
         ((KafkaSourceFetcherManager<T>) splitFetcherManager)
                 .commitOffsets(
                         offsetsToCommit.get(checkpointId),
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
index 0de4bd3..c02232b 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
@@ -17,22 +17,103 @@
 
 package org.apache.flink.connector.kafka.source;
 
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 /** Tests for {@link KafkaSourceBuilder}. */
 public class KafkaSourceBuilderTest extends TestLogger {
 
     @Test
+    public void testBuildSourceWithGroupId() {
+        final KafkaSource<String> kafkaSource = getBasicBuilder().setGroupId("groupId").build();
+        // Commit on checkpoint should be enabled by default
+        Assertions.assertTrue(
+                kafkaSource
+                        .getConfiguration()
+                        .get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
+        // Auto commit should be disabled by default
+        Assertions.assertFalse(
+                kafkaSource
+                        .getConfiguration()
+                        .get(
+                                ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                                        .booleanType()
+                                        .noDefaultValue()));
+    }
+
+    @Test
     public void testBuildSourceWithoutGroupId() {
-        new KafkaSourceBuilder<String>()
+        final KafkaSource<String> kafkaSource = getBasicBuilder().build();
+        // Commit on checkpoint and auto commit should be disabled because group.id is not specified
+        Assertions.assertFalse(
+                kafkaSource
+                        .getConfiguration()
+                        .get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
+        Assertions.assertFalse(
+                kafkaSource
+                        .getConfiguration()
+                        .get(
+                                ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                                        .booleanType()
+                                        .noDefaultValue()));
+    }
+
+    @Test
+    public void testEnableCommitOnCheckpointWithoutGroupId() {
+        final IllegalStateException exception =
+                Assert.assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                getBasicBuilder()
+                                        .setProperty(
+                                                KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
+                                                        .key(),
+                                                "true")
+                                        .build());
+        MatcherAssert.assertThat(
+                exception.getMessage(),
+                CoreMatchers.containsString(
+                        "Property group.id is required when offset commit is enabled"));
+    }
+
+    @Test
+    public void testEnableAutoCommitWithoutGroupId() {
+        final IllegalStateException exception =
+                Assert.assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                getBasicBuilder()
+                                        .setProperty(
+                                                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+                                        .build());
+        MatcherAssert.assertThat(
+                exception.getMessage(),
+                CoreMatchers.containsString(
+                        "Property group.id is required when offset commit is enabled"));
+    }
+
+    @Test
+    public void testDisableOffsetCommitWithoutGroupId() {
+        getBasicBuilder()
+                .setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false")
+                .build();
+        getBasicBuilder().setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false").build();
+    }
+
+    private KafkaSourceBuilder<String> getBasicBuilder() {
+        return new KafkaSourceBuilder<String>()
                 .setBootstrapServers("testServer")
                 .setTopics("topic")
                 .setDeserializer(
-                        KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
-                .build();
+                        KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
     }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 5cb7074..47891aa4 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -219,6 +219,27 @@
                             source, WatermarkStrategy.noWatermarks(), "testRedundantParallelism");
             executeAndVerify(env, stream);
         }
+
+        @Test
+        public void testBasicReadWithoutGroupId() throws Exception {
+            KafkaSource<PartitionAndValue> source =
+                    KafkaSource.<PartitionAndValue>builder()
+                            .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+                            .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+                            .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+                            .setStartingOffsets(OffsetsInitializer.earliest())
+                            .setBounded(OffsetsInitializer.latest())
+                            .build();
+
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            env.setParallelism(1);
+            DataStream<PartitionAndValue> stream =
+                    env.fromSource(
+                            source,
+                            WatermarkStrategy.noWatermarks(),
+                            "testBasicReadWithoutGroupId");
+            executeAndVerify(env, stream);
+        }
     }
 
     /** Integration test based on connector testing framework. */
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index f4e3fbd..09652ea 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -25,6 +25,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils;
 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
@@ -64,6 +65,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -250,6 +252,32 @@
     }
 
     @Test
+    public void testDisableOffsetCommit() throws Exception {
+        final Properties properties = new Properties();
+        properties.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false");
+        try (KafkaSourceReader<Integer> reader =
+                (KafkaSourceReader<Integer>)
+                        createReader(
+                                Boundedness.CONTINUOUS_UNBOUNDED,
+                                new TestingReaderContext(),
+                                (ignore) -> {},
+                                properties)) {
+            reader.addSplits(
+                    getSplits(numSplits, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED));
+            ValidatingSourceOutput output = new ValidatingSourceOutput();
+            long checkpointId = 0;
+            do {
+                checkpointId++;
+                reader.pollNext(output);
+                // Create a checkpoint for each message consumption, but not complete them.
+                reader.snapshotState(checkpointId);
+                // Offsets to commit should be always empty because offset commit is disabled
+                assertEquals(0, reader.getOffsetsToCommit().size());
+            } while (output.count() < totalNumRecords);
+        }
+    }
+
+    @Test
     public void testKafkaSourceMetrics() throws Exception {
         final MetricListener metricListener = new MetricListener();
         final String groupId = "testKafkaSourceMetrics";
@@ -405,6 +433,17 @@
             SourceReaderContext context,
             Consumer<Collection<String>> splitFinishedHook)
             throws Exception {
+        Properties properties = new Properties();
+        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        return createReader(boundedness, context, splitFinishedHook, properties);
+    }
+
+    private SourceReader<Integer, KafkaPartitionSplit> createReader(
+            Boundedness boundedness,
+            SourceReaderContext context,
+            Consumer<Collection<String>> splitFinishedHook,
+            Properties props)
+            throws Exception {
         KafkaSourceBuilder<Integer> builder =
                 KafkaSource.<Integer>builder()
                         .setClientIdPrefix("KafkaSourceReaderTest")
@@ -415,9 +454,8 @@
                         .setProperty(
                                 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                 KafkaSourceTestEnv.brokerConnectionStrings)
-                        .setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-                        .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
+                        .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+                        .setProperties(props);
         if (boundedness == Boundedness.BOUNDED) {
             builder.setBounded(OffsetsInitializer.latest());
         }