[FLINK-24277][connector/kafka] Add configuration for committing offset on checkpoint and disable it if group ID is not specified
diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md
index a94d7bd..b614c10 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -154,6 +154,7 @@
below for more details.
- ```register.consumer.metrics``` specifies whether to register metrics of KafkaConsumer in Flink
metric group
+- ```commit.offsets.on.checkpoint``` specifies whether to commit consuming offsets to Kafka brokers on checkpoint
For configurations of KafkaConsumer, you can refer to
<a href="http://kafka.apache.org/documentation/#consumerconfigs">Apache Kafka documentation</a>
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index a5d89b9..9a05089 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -214,4 +214,9 @@
props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty(key)));
return config;
}
+
+ @VisibleForTesting
+ Configuration getConfiguration() {
+ return toConfiguration(props);
+ }
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
index cd286ed..eb93683 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
@@ -41,6 +41,7 @@
import java.util.regex.Pattern;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* The @builder class for {@link KafkaSource} to make it easier for the users to construct a {@link
@@ -429,8 +430,12 @@
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
true);
- maybeOverride(
- ConsumerConfig.GROUP_ID_CONFIG, "KafkaSource-" + new Random().nextLong(), false);
+ if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ LOG.warn(
+ "Offset commit on checkpoint is disabled because {} is not specified",
+ ConsumerConfig.GROUP_ID_CONFIG);
+ maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false);
+ }
maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false);
maybeOverride(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
@@ -443,10 +448,13 @@
"-1",
boundedness == Boundedness.BOUNDED);
- // If the client id prefix is not set, reuse the consumer group id as the client id prefix.
+ // If the client id prefix is not set, reuse the consumer group id as the client id prefix,
+ // or generate a random string if consumer group id is not specified.
maybeOverride(
KafkaSourceOptions.CLIENT_ID_PREFIX.key(),
- props.getProperty(ConsumerConfig.GROUP_ID_CONFIG),
+ props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)
+ ? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG)
+ : "KafkaSource-" + new Random().nextLong(),
false);
}
@@ -481,5 +489,24 @@
"No subscribe mode is specified, "
+ "should be one of topics, topic pattern and partition set.");
checkNotNull(deserializationSchema, "Deserialization schema is required but not provided.");
+ // Check consumer group ID
+ checkState(
+ props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || !offsetCommitEnabledManually(),
+ String.format(
+ "Property %s is required when offset commit is enabled",
+ ConsumerConfig.GROUP_ID_CONFIG));
+ }
+
+ private boolean offsetCommitEnabledManually() {
+ boolean autoCommit =
+ props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+ && Boolean.parseBoolean(
+ props.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+ boolean commitOnCheckpoint =
+ props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())
+ && Boolean.parseBoolean(
+ props.getProperty(
+ KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()));
+ return autoCommit || commitOnCheckpoint;
}
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
index e48804b..1a05833 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java
@@ -48,6 +48,12 @@
.withDescription(
"Whether to register metrics of KafkaConsumer into Flink metric group");
+ public static final ConfigOption<Boolean> COMMIT_OFFSETS_ON_CHECKPOINT =
+ ConfigOptions.key("commit.offsets.on.checkpoint")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether to commit consuming offset on checkpoint.");
+
@SuppressWarnings("unchecked")
public static <T> T getOption(
Properties props, ConfigOption<?> configOption, Function<String, T> parser) {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index 2bd2c31..d048230 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -353,17 +353,19 @@
Set<TopicPartition> partitionsStoppingAtCommitted) {
Map<TopicPartition, Long> endOffset = consumer.endOffsets(partitionsStoppingAtLatest);
stoppingOffsets.putAll(endOffset);
- consumer.committed(partitionsStoppingAtCommitted)
- .forEach(
- (tp, offsetAndMetadata) -> {
- Preconditions.checkNotNull(
- offsetAndMetadata,
- String.format(
- "Partition %s should stop at committed offset. "
- + "But there is no committed offset of this partition for group %s",
- tp, groupId));
- stoppingOffsets.put(tp, offsetAndMetadata.offset());
- });
+ if (!partitionsStoppingAtCommitted.isEmpty()) {
+ consumer.committed(partitionsStoppingAtCommitted)
+ .forEach(
+ (tp, offsetAndMetadata) -> {
+ Preconditions.checkNotNull(
+ offsetAndMetadata,
+ String.format(
+ "Partition %s should stop at committed offset. "
+ + "But there is no committed offset of this partition for group %s",
+ tp, groupId));
+ stoppingOffsets.put(tp, offsetAndMetadata.offset());
+ });
+ }
}
private void removeEmptySplits() {
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
index 79c8064..08f82b0 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java
@@ -26,6 +26,7 @@
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
@@ -55,6 +56,7 @@
private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit;
private final ConcurrentMap<TopicPartition, OffsetAndMetadata> offsetsOfFinishedSplits;
private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
+ private final boolean commitOffsetsOnCheckpoint;
public KafkaSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue,
@@ -67,6 +69,13 @@
this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
+ this.commitOffsetsOnCheckpoint =
+ config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
+ if (!commitOffsetsOnCheckpoint) {
+ LOG.warn(
+ "Offset commit on checkpoint is disabled. "
+ + "Consuming offset will not be reported back to Kafka cluster.");
+ }
}
@Override
@@ -84,6 +93,10 @@
@Override
public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);
+ if (!commitOffsetsOnCheckpoint) {
+ return splits;
+ }
+
if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
offsetsToCommit.put(checkpointId, Collections.emptyMap());
} else {
@@ -108,6 +121,10 @@
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.debug("Committing offsets for checkpoint {}", checkpointId);
+ if (!commitOffsetsOnCheckpoint) {
+ return;
+ }
+
((KafkaSourceFetcherManager<T>) splitFetcherManager)
.commitOffsets(
offsetsToCommit.get(checkpointId),
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
index 0de4bd3..c02232b 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java
@@ -17,22 +17,103 @@
package org.apache.flink.connector.kafka.source;
+import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.TestLogger;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
/** Tests for {@link KafkaSourceBuilder}. */
public class KafkaSourceBuilderTest extends TestLogger {
@Test
+ public void testBuildSourceWithGroupId() {
+ final KafkaSource<String> kafkaSource = getBasicBuilder().setGroupId("groupId").build();
+ // Commit on checkpoint should be enabled by default
+ Assertions.assertTrue(
+ kafkaSource
+ .getConfiguration()
+ .get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
+ // Auto commit should be disabled by default
+ Assertions.assertFalse(
+ kafkaSource
+ .getConfiguration()
+ .get(
+ ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+ .booleanType()
+ .noDefaultValue()));
+ }
+
+ @Test
public void testBuildSourceWithoutGroupId() {
- new KafkaSourceBuilder<String>()
+ final KafkaSource<String> kafkaSource = getBasicBuilder().build();
+ // Commit on checkpoint and auto commit should be disabled because group.id is not specified
+ Assertions.assertFalse(
+ kafkaSource
+ .getConfiguration()
+ .get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT));
+ Assertions.assertFalse(
+ kafkaSource
+ .getConfiguration()
+ .get(
+ ConfigOptions.key(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+ .booleanType()
+ .noDefaultValue()));
+ }
+
+ @Test
+ public void testEnableCommitOnCheckpointWithoutGroupId() {
+ final IllegalStateException exception =
+ Assert.assertThrows(
+ IllegalStateException.class,
+ () ->
+ getBasicBuilder()
+ .setProperty(
+ KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
+ .key(),
+ "true")
+ .build());
+ MatcherAssert.assertThat(
+ exception.getMessage(),
+ CoreMatchers.containsString(
+ "Property group.id is required when offset commit is enabled"));
+ }
+
+ @Test
+ public void testEnableAutoCommitWithoutGroupId() {
+ final IllegalStateException exception =
+ Assert.assertThrows(
+ IllegalStateException.class,
+ () ->
+ getBasicBuilder()
+ .setProperty(
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+ .build());
+ MatcherAssert.assertThat(
+ exception.getMessage(),
+ CoreMatchers.containsString(
+ "Property group.id is required when offset commit is enabled"));
+ }
+
+ @Test
+ public void testDisableOffsetCommitWithoutGroupId() {
+ getBasicBuilder()
+ .setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false")
+ .build();
+ getBasicBuilder().setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false").build();
+ }
+
+ private KafkaSourceBuilder<String> getBasicBuilder() {
+ return new KafkaSourceBuilder<String>()
.setBootstrapServers("testServer")
.setTopics("topic")
.setDeserializer(
- KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
- .build();
+ KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
}
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 5cb7074..47891aa4 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -219,6 +219,27 @@
source, WatermarkStrategy.noWatermarks(), "testRedundantParallelism");
executeAndVerify(env, stream);
}
+
+ @Test
+ public void testBasicReadWithoutGroupId() throws Exception {
+ KafkaSource<PartitionAndValue> source =
+ KafkaSource.<PartitionAndValue>builder()
+ .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+ .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setBounded(OffsetsInitializer.latest())
+ .build();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ DataStream<PartitionAndValue> stream =
+ env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "testBasicReadWithoutGroupId");
+ executeAndVerify(env, stream);
+ }
}
/** Integration test based on connector testing framework. */
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index f4e3fbd..09652ea 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -25,6 +25,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
@@ -64,6 +65,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -250,6 +252,32 @@
}
@Test
+ public void testDisableOffsetCommit() throws Exception {
+ final Properties properties = new Properties();
+ properties.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false");
+ try (KafkaSourceReader<Integer> reader =
+ (KafkaSourceReader<Integer>)
+ createReader(
+ Boundedness.CONTINUOUS_UNBOUNDED,
+ new TestingReaderContext(),
+ (ignore) -> {},
+ properties)) {
+ reader.addSplits(
+ getSplits(numSplits, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED));
+ ValidatingSourceOutput output = new ValidatingSourceOutput();
+ long checkpointId = 0;
+ do {
+ checkpointId++;
+ reader.pollNext(output);
+ // Create a checkpoint for each message consumption, but not complete them.
+ reader.snapshotState(checkpointId);
+ // Offsets to commit should be always empty because offset commit is disabled
+ assertEquals(0, reader.getOffsetsToCommit().size());
+ } while (output.count() < totalNumRecords);
+ }
+ }
+
+ @Test
public void testKafkaSourceMetrics() throws Exception {
final MetricListener metricListener = new MetricListener();
final String groupId = "testKafkaSourceMetrics";
@@ -405,6 +433,17 @@
SourceReaderContext context,
Consumer<Collection<String>> splitFinishedHook)
throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ return createReader(boundedness, context, splitFinishedHook, properties);
+ }
+
+ private SourceReader<Integer, KafkaPartitionSplit> createReader(
+ Boundedness boundedness,
+ SourceReaderContext context,
+ Consumer<Collection<String>> splitFinishedHook,
+ Properties props)
+ throws Exception {
KafkaSourceBuilder<Integer> builder =
KafkaSource.<Integer>builder()
.setClientIdPrefix("KafkaSourceReaderTest")
@@ -415,9 +454,8 @@
.setProperty(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaSourceTestEnv.brokerConnectionStrings)
- .setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
- .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
+ .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ .setProperties(props);
if (boundedness == Boundedness.BOUNDED) {
builder.setBounded(OffsetsInitializer.latest());
}