| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.connector.kafka.source; |
| |
| import org.apache.flink.api.common.serialization.DeserializationSchema; |
| import org.apache.flink.api.connector.source.Boundedness; |
| import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; |
| import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
| import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator; |
| import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; |
| import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; |
| |
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.serialization.ByteArrayDeserializer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.regex.Pattern; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| import static org.apache.flink.util.Preconditions.checkState; |
| |
| /** |
| * The @builder class for {@link KafkaSource} to make it easier for the users to construct a {@link |
| * KafkaSource}. |
| * |
| * <p>The following example shows the minimum setup to create a KafkaSource that reads the String |
| * values from a Kafka topic. |
| * |
| * <pre>{@code |
| * KafkaSource<String> source = KafkaSource |
| * .<String>builder() |
| * .setBootstrapServers(MY_BOOTSTRAP_SERVERS) |
| * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) |
| * .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) |
| * .build(); |
| * }</pre> |
| * |
| * <p>The bootstrap servers, topics/partitions to consume, and the record deserializer are required |
| * fields that must be set. |
| * |
| * <p>To specify the starting offsets of the KafkaSource, one can call {@link |
| * #setStartingOffsets(OffsetsInitializer)}. |
| * |
| * <p>By default the KafkaSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never |
| * stops until the Flink job is canceled or fails. To let the KafkaSource run in {@link |
| * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link |
| * #setUnbounded(OffsetsInitializer)}. For example the following KafkaSource stops after it consumes |
| * up to the latest partition offsets at the point when the Flink started. |
| * |
| * <pre>{@code |
| * KafkaSource<String> source = KafkaSource |
| * .<String>builder() |
| * .setBootstrapServers(MY_BOOTSTRAP_SERVERS) |
| * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) |
| * .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) |
| * .setUnbounded(OffsetsInitializer.latest()) |
| * .build(); |
| * }</pre> |
| * |
| * <p>Check the Java docs of each individual methods to learn more about the settings to build a |
| * KafkaSource. |
| */ |
| public class KafkaSourceBuilder<OUT> { |
| private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceBuilder.class); |
| private static final String[] REQUIRED_CONFIGS = {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}; |
| // The subscriber specifies the partitions to subscribe to. |
| private KafkaSubscriber subscriber; |
| // Users can specify the starting / stopping offset initializer. |
| private OffsetsInitializer startingOffsetsInitializer; |
| private OffsetsInitializer stoppingOffsetsInitializer; |
| // Boundedness |
| private Boundedness boundedness; |
| private KafkaRecordDeserializationSchema<OUT> deserializationSchema; |
| // The configurations. |
| protected Properties props; |
| |
| KafkaSourceBuilder() { |
| this.subscriber = null; |
| this.startingOffsetsInitializer = OffsetsInitializer.earliest(); |
| this.stoppingOffsetsInitializer = new NoStoppingOffsetsInitializer(); |
| this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED; |
| this.deserializationSchema = null; |
| this.props = new Properties(); |
| } |
| |
| /** |
| * Sets the bootstrap servers for the KafkaConsumer of the KafkaSource. |
| * |
| * @param bootstrapServers the bootstrap servers of the Kafka cluster. |
| * @return this KafkaSourceBuilder. |
| */ |
| public KafkaSourceBuilder<OUT> setBootstrapServers(String bootstrapServers) { |
| return setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); |
| } |
| |
| /** |
| * Sets the consumer group id of the KafkaSource. |
| * |
| * @param groupId the group id of the KafkaSource. |
| * @return this KafkaSourceBuilder. |
| */ |
| public KafkaSourceBuilder<OUT> setGroupId(String groupId) { |
| return setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); |
| } |
| |
| /** |
| * Set a list of topics the KafkaSource should consume from. All the topics in the list should |
| * have existed in the Kafka cluster. Otherwise an exception will be thrown. To allow some of |
| * the topics to be created lazily, please use {@link #setTopicPattern(Pattern)} instead. |
| * |
| * @param topics the list of topics to consume from. |
| * @return this KafkaSourceBuilder. |
| * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection) |
| */ |
| public KafkaSourceBuilder<OUT> setTopics(List<String> topics) { |
| ensureSubscriberIsNull("topics"); |
| subscriber = KafkaSubscriber.getTopicListSubscriber(topics); |
| return this; |
| } |
| |
| /** |
| * Set a list of topics the KafkaSource should consume from. All the topics in the list should |
| * have existed in the Kafka cluster. Otherwise an exception will be thrown. To allow some of |
| * the topics to be created lazily, please use {@link #setTopicPattern(Pattern)} instead. |
| * |
| * @param topics the list of topics to consume from. |
| * @return this KafkaSourceBuilder. |
| * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection) |
| */ |
| public KafkaSourceBuilder<OUT> setTopics(String... topics) { |
| return setTopics(Arrays.asList(topics)); |
| } |
| |
| /** |
| * Set a topic pattern to consume from use the java {@link Pattern}. |
| * |
| * @param topicPattern the pattern of the topic name to consume from. |
| * @return this KafkaSourceBuilder. |
| * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern) |
| */ |
| public KafkaSourceBuilder<OUT> setTopicPattern(Pattern topicPattern) { |
| ensureSubscriberIsNull("topic pattern"); |
| subscriber = KafkaSubscriber.getTopicPatternSubscriber(topicPattern); |
| return this; |
| } |
| |
| /** |
| * Set a set of partitions to consume from. |
| * |
| * @param partitions the set of partitions to consume from. |
| * @return this KafkaSourceBuilder. |
| * @see org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection) |
| */ |
| public KafkaSourceBuilder<OUT> setPartitions(Set<TopicPartition> partitions) { |
| ensureSubscriberIsNull("partitions"); |
| subscriber = KafkaSubscriber.getPartitionSetSubscriber(partitions); |
| return this; |
| } |
| |
| /** |
| * Specify from which offsets the KafkaSource should start consume from by providing an {@link |
| * OffsetsInitializer}. |
| * |
| * <p>The following {@link OffsetsInitializer}s are commonly used and provided out of the box. |
| * Users can also implement their own {@link OffsetsInitializer} for custom behaviors. |
| * |
| * <ul> |
| * <li>{@link OffsetsInitializer#earliest()} - starting from the earliest offsets. This is |
| * also the default {@link OffsetsInitializer} of the KafkaSource for starting offsets. |
| * <li>{@link OffsetsInitializer#latest()} - starting from the latest offsets. |
| * <li>{@link OffsetsInitializer#committedOffsets()} - starting from the committed offsets of |
| * the consumer group. |
| * <li>{@link |
| * OffsetsInitializer#committedOffsets(org.apache.kafka.clients.consumer.OffsetResetStrategy)} |
| * - starting from the committed offsets of the consumer group. If there is no committed |
| * offsets, starting from the offsets specified by the {@link |
| * org.apache.kafka.clients.consumer.OffsetResetStrategy OffsetResetStrategy}. |
| * <li>{@link OffsetsInitializer#offsets(Map)} - starting from the specified offsets for each |
| * partition. |
| * <li>{@link OffsetsInitializer#timestamp(long)} - starting from the specified timestamp for |
| * each partition. Note that the guarantee here is that all the records in Kafka whose |
| * {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater than |
| * the given starting timestamp will be consumed. However, it is possible that some |
| * consumer records whose timestamp is smaller than the given starting timestamp are also |
| * consumed. |
| * </ul> |
| * |
| * @param startingOffsetsInitializer the {@link OffsetsInitializer} setting the starting offsets |
| * for the Source. |
| * @return this KafkaSourceBuilder. |
| */ |
| public KafkaSourceBuilder<OUT> setStartingOffsets( |
| OffsetsInitializer startingOffsetsInitializer) { |
| this.startingOffsetsInitializer = startingOffsetsInitializer; |
| return this; |
| } |
| |
| /** |
| * By default the KafkaSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner |
| * and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run as |
| * a streaming source but still stops at some point, one can set an {@link OffsetsInitializer} |
| * to specify the stopping offsets for each partition. When all the partitions have reached |
| * their stopping offsets, the KafkaSource will then exit. |
| * |
| * <p>This method is different from {@link #setBounded(OffsetsInitializer)} that after setting |
| * the stopping offsets with this method, {@link KafkaSource#getBoundedness()} will still return |
| * {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will stop at the stopping offsets |
| * specified by the stopping offsets {@link OffsetsInitializer}. |
| * |
| * <p>The following {@link OffsetsInitializer} are commonly used and provided out of the box. |
| * Users can also implement their own {@link OffsetsInitializer} for custom behaviors. |
| * |
| * <ul> |
| * <li>{@link OffsetsInitializer#latest()} - stop at the latest offsets of the partitions when |
| * the KafkaSource starts to run. |
| * <li>{@link OffsetsInitializer#committedOffsets()} - stops at the committed offsets of the |
| * consumer group. |
| * <li>{@link OffsetsInitializer#offsets(Map)} - stops at the specified offsets for each |
| * partition. |
| * <li>{@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each |
| * partition. The guarantee of setting the stopping timestamp is that no Kafka records |
| * whose {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater |
| * than the given stopping timestamp will be consumed. However, it is possible that some |
| * records whose timestamp is smaller than the specified stopping timestamp are not |
| * consumed. |
| * </ul> |
| * |
| * @param stoppingOffsetsInitializer The {@link OffsetsInitializer} to specify the stopping |
| * offset. |
| * @return this KafkaSourceBuilder. |
| * @see #setBounded(OffsetsInitializer) |
| */ |
| public KafkaSourceBuilder<OUT> setUnbounded(OffsetsInitializer stoppingOffsetsInitializer) { |
| this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED; |
| this.stoppingOffsetsInitializer = stoppingOffsetsInitializer; |
| return this; |
| } |
| |
| /** |
| * By default the KafkaSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner |
| * and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run in |
| * {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link |
| * OffsetsInitializer} to specify the stopping offsets for each partition. When all the |
| * partitions have reached their stopping offsets, the KafkaSource will then exit. |
| * |
| * <p>This method is different from {@link #setUnbounded(OffsetsInitializer)} that after setting |
| * the stopping offsets with this method, {@link KafkaSource#getBoundedness()} will return |
| * {@link Boundedness#BOUNDED} instead of {@link Boundedness#CONTINUOUS_UNBOUNDED}. |
| * |
| * <p>The following {@link OffsetsInitializer} are commonly used and provided out of the box. |
| * Users can also implement their own {@link OffsetsInitializer} for custom behaviors. |
| * |
| * <ul> |
| * <li>{@link OffsetsInitializer#latest()} - stop at the latest offsets of the partitions when |
| * the KafkaSource starts to run. |
| * <li>{@link OffsetsInitializer#committedOffsets()} - stops at the committed offsets of the |
| * consumer group. |
| * <li>{@link OffsetsInitializer#offsets(Map)} - stops at the specified offsets for each |
| * partition. |
| * <li>{@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each |
| * partition. The guarantee of setting the stopping timestamp is that no Kafka records |
| * whose {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater |
| * than the given stopping timestamp will be consumed. However, it is possible that some |
| * records whose timestamp is smaller than the specified stopping timestamp are not |
| * consumed. |
| * </ul> |
| * |
| * @param stoppingOffsetsInitializer the {@link OffsetsInitializer} to specify the stopping |
| * offsets. |
| * @return this KafkaSourceBuilder. |
| * @see #setUnbounded(OffsetsInitializer) |
| */ |
| public KafkaSourceBuilder<OUT> setBounded(OffsetsInitializer stoppingOffsetsInitializer) { |
| this.boundedness = Boundedness.BOUNDED; |
| this.stoppingOffsetsInitializer = stoppingOffsetsInitializer; |
| return this; |
| } |
| |
| /** |
| * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link |
| * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. |
| * |
| * @param recordDeserializer the deserializer for Kafka {@link |
| * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}. |
| * @return this KafkaSourceBuilder. |
| */ |
| public KafkaSourceBuilder<OUT> setDeserializer( |
| KafkaRecordDeserializationSchema<OUT> recordDeserializer) { |
| this.deserializationSchema = recordDeserializer; |
| return this; |
| } |
| |
| /** |
| * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link |
| * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. The given |
| * {@link DeserializationSchema} will be used to deserialize the value of ConsumerRecord. The |
| * other information (e.g. key) in a ConsumerRecord will be ignored. |
| * |
| * @param deserializationSchema the {@link DeserializationSchema} to use for deserialization. |
| * @return this KafkaSourceBuilder. |
| */ |
| public KafkaSourceBuilder<OUT> setValueOnlyDeserializer( |
| DeserializationSchema<OUT> deserializationSchema) { |
| this.deserializationSchema = |
| KafkaRecordDeserializationSchema.valueOnly(deserializationSchema); |
| return this; |
| } |
| |
| /** |
| * Sets the client id prefix of this KafkaSource. |
| * |
| * @param prefix the client id prefix to use for this KafkaSource. |
| * @return this KafkaSourceBuilder. |
| */ |
| public KafkaSourceBuilder<OUT> setClientIdPrefix(String prefix) { |
| return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), prefix); |
| } |
| |
| /** |
| * Set an arbitrary property for the KafkaSource and KafkaConsumer. The valid keys can be found |
| * in {@link ConsumerConfig} and {@link KafkaSourceOptions}. |
| * |
| * <p>Note that the following keys will be overridden by the builder when the KafkaSource is |
| * created. |
| * |
| * <ul> |
| * <li><code>key.deserializer</code> is always set to {@link ByteArrayDeserializer}. |
| * <li><code>value.deserializer</code> is always set to {@link ByteArrayDeserializer}. |
| * <li><code>auto.offset.reset.strategy</code> is overridden by {@link |
| * OffsetsInitializer#getAutoOffsetResetStrategy()} for the starting offsets, which is by |
| * default {@link OffsetsInitializer#earliest()}. |
| * <li><code>partition.discovery.interval.ms</code> is overridden to -1 when {@link |
| * #setBounded(OffsetsInitializer)} has been invoked. |
| * </ul> |
| * |
| * @param key the key of the property. |
| * @param value the value of the property. |
| * @return this KafkaSourceBuilder. |
| */ |
| public KafkaSourceBuilder<OUT> setProperty(String key, String value) { |
| props.setProperty(key, value); |
| return this; |
| } |
| |
| /** |
| * Set arbitrary properties for the KafkaSource and KafkaConsumer. The valid keys can be found |
| * in {@link ConsumerConfig} and {@link KafkaSourceOptions}. |
| * |
| * <p>Note that the following keys will be overridden by the builder when the KafkaSource is |
| * created. |
| * |
| * <ul> |
| * <li><code>key.deserializer</code> is always set to {@link ByteArrayDeserializer}. |
| * <li><code>value.deserializer</code> is always set to {@link ByteArrayDeserializer}. |
| * <li><code>auto.offset.reset.strategy</code> is overridden by {@link |
| * OffsetsInitializer#getAutoOffsetResetStrategy()} for the starting offsets, which is by |
| * default {@link OffsetsInitializer#earliest()}. |
| * <li><code>partition.discovery.interval.ms</code> is overridden to -1 when {@link |
| * #setBounded(OffsetsInitializer)} has been invoked. |
| * <li><code>client.id</code> is overridden to the "client.id.prefix-RANDOM_LONG", or |
| * "group.id-RANDOM_LONG" if the client id prefix is not set. |
| * </ul> |
| * |
| * @param props the properties to set for the KafkaSource. |
| * @return this KafkaSourceBuilder. |
| */ |
| public KafkaSourceBuilder<OUT> setProperties(Properties props) { |
| this.props.putAll(props); |
| return this; |
| } |
| |
| /** |
| * Build the {@link KafkaSource}. |
| * |
| * @return a KafkaSource with the settings made for this builder. |
| */ |
| public KafkaSource<OUT> build() { |
| sanityCheck(); |
| parseAndSetRequiredProperties(); |
| return new KafkaSource<>( |
| subscriber, |
| startingOffsetsInitializer, |
| stoppingOffsetsInitializer, |
| boundedness, |
| deserializationSchema, |
| props); |
| } |
| |
| // ------------- private helpers -------------- |
| |
| private void ensureSubscriberIsNull(String attemptingSubscribeMode) { |
| if (subscriber != null) { |
| throw new IllegalStateException( |
| String.format( |
| "Cannot use %s for consumption because a %s is already set for consumption.", |
| attemptingSubscribeMode, subscriber.getClass().getSimpleName())); |
| } |
| } |
| |
| private void parseAndSetRequiredProperties() { |
| maybeOverride( |
| ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, |
| ByteArrayDeserializer.class.getName(), |
| true); |
| maybeOverride( |
| ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, |
| ByteArrayDeserializer.class.getName(), |
| true); |
| if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { |
| LOG.warn( |
| "Offset commit on checkpoint is disabled because {} is not specified", |
| ConsumerConfig.GROUP_ID_CONFIG); |
| maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false", false); |
| } |
| maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false", false); |
| maybeOverride( |
| ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, |
| startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(), |
| true); |
| |
| // If the source is bounded, do not run periodic partition discovery. |
| maybeOverride( |
| KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), |
| "-1", |
| boundedness == Boundedness.BOUNDED); |
| |
| // If the client id prefix is not set, reuse the consumer group id as the client id prefix, |
| // or generate a random string if consumer group id is not specified. |
| maybeOverride( |
| KafkaSourceOptions.CLIENT_ID_PREFIX.key(), |
| props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) |
| ? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG) |
| : "KafkaSource-" + new Random().nextLong(), |
| false); |
| } |
| |
| private boolean maybeOverride(String key, String value, boolean override) { |
| boolean overridden = false; |
| String userValue = props.getProperty(key); |
| if (userValue != null) { |
| if (override) { |
| LOG.warn( |
| String.format( |
| "Property %s is provided but will be overridden from %s to %s", |
| key, userValue, value)); |
| props.setProperty(key, value); |
| overridden = true; |
| } |
| } else { |
| props.setProperty(key, value); |
| } |
| return overridden; |
| } |
| |
| private void sanityCheck() { |
| // Check required configs. |
| for (String requiredConfig : REQUIRED_CONFIGS) { |
| checkNotNull( |
| props.getProperty(requiredConfig), |
| String.format("Property %s is required but not provided", requiredConfig)); |
| } |
| // Check required settings. |
| checkNotNull( |
| subscriber, |
| "No subscribe mode is specified, " |
| + "should be one of topics, topic pattern and partition set."); |
| checkNotNull(deserializationSchema, "Deserialization schema is required but not provided."); |
| // Check consumer group ID |
| checkState( |
| props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || !offsetCommitEnabledManually(), |
| String.format( |
| "Property %s is required when offset commit is enabled", |
| ConsumerConfig.GROUP_ID_CONFIG)); |
| // Check offsets initializers |
| if (startingOffsetsInitializer instanceof OffsetsInitializerValidator) { |
| ((OffsetsInitializerValidator) startingOffsetsInitializer).validate(props); |
| } |
| if (stoppingOffsetsInitializer instanceof OffsetsInitializerValidator) { |
| ((OffsetsInitializerValidator) stoppingOffsetsInitializer).validate(props); |
| } |
| } |
| |
| private boolean offsetCommitEnabledManually() { |
| boolean autoCommit = |
| props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) |
| && Boolean.parseBoolean( |
| props.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); |
| boolean commitOnCheckpoint = |
| props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()) |
| && Boolean.parseBoolean( |
| props.getProperty( |
| KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())); |
| return autoCommit || commitOnCheckpoint; |
| } |
| } |