blob: 499a1f8818e8f97eef8c9169f17c1e97ae360836 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.io.Read.Unbounded;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An unbounded source and a sink for <a href="http://kafka.apache.org/">Kafka</a> topics.
*
* <h2>Read from Kafka as {@link UnboundedSource}</h2>
*
* <h3>Reading from Kafka topics</h3>
*
* <p>KafkaIO source returns unbounded collection of Kafka records as {@code
* PCollection<KafkaRecord<K, V>>}. A {@link KafkaRecord} includes basic metadata like
* topic-partition and offset, along with key and value associated with a Kafka record.
*
* <p>Although most applications consume a single topic, the source can be configured to consume
* multiple topics or even a specific set of {@link TopicPartition}s.
*
* <p>To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>,
* one or more topics to consume, and key and value deserializers. For example:
*
* <pre>{@code
* pipeline
* .apply(KafkaIO.<Long, String>read()
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
* .withKeyDeserializer(LongDeserializer.class)
* .withValueDeserializer(StringDeserializer.class)
*
* // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
*
* // Rest of the settings are optional :
*
* // you can further customize KafkaConsumer used to read the records by adding more
* // settings for ConsumerConfig. e.g :
* .withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))
*
* // set event times and watermark based on 'LogAppendTime'. To provide a custom
* // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
* // Use withCreateTime() with topics that have 'CreateTime' timestamps.
* .withLogAppendTime()
*
* // restrict reader to committed messages on Kafka (see method documentation).
* .withReadCommitted()
*
* // offset consumed by the pipeline can be committed back.
* .commitOffsetsInFinalize()
*
* // Specified a serializable function which can determine whether to stop reading from given
* // TopicPartition during runtime. Note that only {@link ReadFromKafkaDoFn} respect the
* // signal.
* .withCheckStopReadingFn(new SerializedFunction<TopicPartition, Boolean>() {})
*
* // finally, if you don't need Kafka metadata, you can drop it.g
* .withoutMetadata() // PCollection<KV<Long, String>>
* )
* .apply(Values.<String>create()) // PCollection<String>
* ...
* }</pre>
*
* <p>Kafka provides deserializers for common types in {@link
* org.apache.kafka.common.serialization}. In addition to deserializers, Beam runners need {@link
* Coder} to materialize key and value objects if necessary. In most cases, you don't need to
* specify {@link Coder} for key and value in the resulting collection because the coders are
* inferred from deserializer types. However, in cases when coder inference fails, they can be
* specified explicitly along with deserializers using {@link
* Read#withKeyDeserializerAndCoder(Class, Coder)} and {@link
* Read#withValueDeserializerAndCoder(Class, Coder)}. Note that Kafka messages are interpreted using
* key and value <i>deserializers</i>.
*
* <h3>Read From Kafka Dynamically</h3>
*
* For a given kafka bootstrap_server, KafkaIO is also able to detect and read from available {@link
* TopicPartition} dynamically and stop reading from un. KafkaIO uses {@link
* WatchKafkaTopicPartitionDoFn} to emit any new added {@link TopicPartition} and uses {@link
* ReadFromKafkaDoFn} to read from each {@link KafkaSourceDescriptor}. Dynamic read is able to solve
* 2 scenarios:
*
* <ul>
* <li>Certain topic or partition is added/deleted.
* <li>Certain topic or partition is added, then removed but added back again
* </ul>
*
* Within providing {@code checkStopReadingFn}, there are 2 more cases that dynamic read can handle:
*
* <ul>
* <li>Certain topic or partition is stopped
* <li>Certain topic or partition is added, then stopped but added back again
* </ul>
*
* Race conditions may happen under 2 supported cases:
*
* <ul>
* <li>A TopicPartition is removed, but added backed again
* <li>A TopicPartition is stopped, then want to read it again
* </ul>
*
* When race condition happens, it will result in the stopped/removed TopicPartition failing to be
* emitted to ReadFromKafkaDoFn again. Or ReadFromKafkaDoFn will output replicated records. The
* major cause for such race condition is that both {@link WatchKafkaTopicPartitionDoFn} and {@link
* ReadFromKafkaDoFn} react to the signal from removed/stopped {@link TopicPartition} but we cannot
* guarantee that both DoFns perform related actions at the same time.
*
* <p>Here is one example for failing to emit new added {@link TopicPartition}:
*
* <ul>
* <li>A {@link WatchKafkaTopicPartitionDoFn} is configured with updating the current tracking set
* every 1 hour.
* <li>One TopicPartition A is tracked by the {@link WatchKafkaTopicPartitionDoFn} at 10:00AM and
* {@link ReadFromKafkaDoFn} starts to read from TopicPartition A immediately.
* <li>At 10:30AM, the {@link WatchKafkaTopicPartitionDoFn} notices that the {@link
* TopicPartition} has been stopped/removed, so it stops reading from it and returns {@code
* ProcessContinuation.stop()}.
* <li>At 10:45 the pipeline author wants to read from TopicPartition A again.
* <li>At 11:00AM when {@link WatchKafkaTopicPartitionDoFn} is invoked by firing timer, it doesn’t
* know that TopicPartition A has been stopped/removed. All it knows is that TopicPartition A
* is still an active TopicPartition and it will not emit TopicPartition A again.
* </ul>
*
* Another race condition example for producing duplicate records:
*
* <ul>
* <li>At 10:00AM, {@link ReadFromKafkaDoFn} is processing TopicPartition A
* <li>At 10:05AM, {@link ReadFromKafkaDoFn} starts to process other TopicPartitions(sdf-initiated
* checkpoint or runner-issued checkpoint happens)
* <li>At 10:10AM, {@link WatchKafkaTopicPartitionDoFn} knows that TopicPartition A is
* stopped/removed
* <li>At 10:15AM, {@link WatchKafkaTopicPartitionDoFn} knows that TopicPartition A is added again
* and emits TopicPartition A again
* <li>At 10:20AM, {@link ReadFromKafkaDoFn} starts to process resumed TopicPartition A but at the
* same time {@link ReadFromKafkaDoFn} is also processing the new emitted TopicPartitionA.
* </ul>
*
* For more design details, please refer to
* https://docs.google.com/document/d/1FU3GxVRetHPLVizP3Mdv6mP5tpjZ3fd99qNjUI5DT5k/. To enable
* dynamic read, you can write a pipeline like:
*
* <pre>{@code
* pipeline
* .apply(KafkaIO.<Long, String>read()
* // Configure the dynamic read with 1 hour, where the pipeline will look into available
* // TopicPartitions and emit new added ones every 1 hour.
* .withDynamicRead(Duration.standardHours(1))
* .withCheckStopReadingFn(new SerializedFunction<TopicPartition, Boolean>() {})
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withKeyDeserializer(LongDeserializer.class)
* .withValueDeserializer(StringDeserializer.class)
* )
* .apply(Values.<String>create()) // PCollection<String>
* ...
* }</pre>
*
* <h3>Partition Assignment and Checkpointing</h3>
*
* The Kafka partitions are evenly distributed among splits (workers).
*
* <p>Checkpointing is fully supported and each split can resume from previous checkpoint (to the
* extent supported by runner). See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for
* more details on splits and checkpoint support.
*
* <p>When the pipeline starts for the first time, or without any checkpoint, the source starts
* consuming from the <em>latest</em> offsets. You can override this behavior to consume from the
* beginning by setting properties appropriately in {@link ConsumerConfig}, through {@link
* Read#withConsumerConfigUpdates(Map)}. You can also enable offset auto_commit in Kafka to resume
* from last committed.
*
* <p>In summary, KafkaIO.read follows below sequence to set initial offset:<br>
* 1. {@link KafkaCheckpointMark} provided by runner;<br>
* 2. Consumer offset stored in Kafka when {@code ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = true};
* <br>
* 3. Start from <em>latest</em> offset by default;
*
* <p>Seek to initial offset is a blocking operation in Kafka API, which can block forever for
* certain versions of Kafka client library. This is resolved by <a
* href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior">KIP-266</a>
* which provides `default.api.timeout.ms` consumer config setting to control such timeouts.
* KafkaIO.read implements timeout itself, to not to block forever in case older Kafka client is
* used. It does recognize `default.api.timeout.ms` setting and will honor the timeout value if it
* is passes in consumer config.
*
* <h3>Use Avro schema with Confluent Schema Registry</h3>
*
* <p>If you want to deserialize the keys and/or values based on a schema available in Confluent
* Schema Registry, KafkaIO can fetch this schema from a specified Schema Registry URL and use it
* for deserialization. A {@link Coder} will be inferred automatically based on the respective
* {@link Deserializer}.
*
* <p>For an Avro schema it will return a {@link PCollection} of {@link KafkaRecord}s where key
* and/or value will be typed as {@link org.apache.avro.generic.GenericRecord}. In this case, users
* don't need to specify key or/and value deserializers and coders since they will be set to {@link
* KafkaAvroDeserializer} and {@link AvroCoder} by default accordingly.
*
* <p>For example, below topic values are serialized with Avro schema stored in Schema Registry,
* keys are typed as {@link Long}:
*
* <pre>{@code
* PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
* .apply(KafkaIO.<Long, GenericRecord>read()
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withTopic("my_topic")
* .withKeyDeserializer(LongDeserializer.class)
* // Use Confluent Schema Registry, specify schema registry URL and value subject
* .withValueDeserializer(
* ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
* ...
* }</pre>
*
* <h2>Read from Kafka as a {@link DoFn}</h2>
*
* {@link ReadSourceDescriptors} is the {@link PTransform} that takes a PCollection of {@link
* KafkaSourceDescriptor} as input and outputs a PCollection of {@link KafkaRecord}. The core
* implementation is based on {@code SplittableDoFn}. For more details about the concept of {@code
* SplittableDoFn}, please refer to the <a
* href="https://beam.apache.org/blog/splittable-do-fn/">blog post</a> and <a
* href="https://s.apache.org/beam-fn-api">design doc</a>. The major difference from {@link
* KafkaIO.Read} is, {@link ReadSourceDescriptors} doesn't require source descriptions(e.g., {@link
* KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
* KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the
* pipeline can populate these source descriptions during runtime. For example, the pipeline can
* query Kafka topics from a BigQuery table and read these topics via {@link ReadSourceDescriptors}.
*
* <h3>Common Kafka Consumer Configurations</h3>
*
* <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
*
* <ul>
* <li>{@link ReadSourceDescriptors#getConsumerConfig()} is the same as {@link
* KafkaIO.Read#getConsumerConfig()}.
* <li>{@link ReadSourceDescriptors#getConsumerFactoryFn()} is the same as {@link
* KafkaIO.Read#getConsumerFactoryFn()}.
* <li>{@link ReadSourceDescriptors#getOffsetConsumerConfig()} is the same as {@link
* KafkaIO.Read#getOffsetConsumerConfig()}.
* <li>{@link ReadSourceDescriptors#getKeyCoder()} is the same as {@link
* KafkaIO.Read#getKeyCoder()}.
* <li>{@link ReadSourceDescriptors#getValueCoder()} is the same as {@link
* KafkaIO.Read#getValueCoder()}.
* <li>{@link ReadSourceDescriptors#getKeyDeserializerProvider()} is the same as {@link
* KafkaIO.Read#getKeyDeserializerProvider()}.
* <li>{@link ReadSourceDescriptors#getValueDeserializerProvider()} is the same as {@link
* KafkaIO.Read#getValueDeserializerProvider()}.
* <li>{@link ReadSourceDescriptors#isCommitOffsetEnabled()} has the same meaning as {@link
* KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
* </ul>
*
* <p>For example, to create a basic {@link ReadSourceDescriptors} transform:
*
* <pre>{@code
* pipeline
* .apply(Create.of(KafkaSourceDescriptor.of(new TopicPartition("topic", 1)))
* .apply(KafkaIO.readAll()
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withKeyDeserializer(LongDeserializer.class).
* .withValueDeserializer(StringDeserializer.class));
* }</pre>
*
* Note that the {@code bootstrapServers} can also be populated from the {@link
* KafkaSourceDescriptor}:
*
* <pre>{@code
* pipeline
* .apply(Create.of(
* KafkaSourceDescriptor.of(
* new TopicPartition("topic", 1),
* null,
* null,
* ImmutableList.of("broker_1:9092", "broker_2:9092"))
* .apply(KafkaIO.readAll()
* .withKeyDeserializer(LongDeserializer.class).
* .withValueDeserializer(StringDeserializer.class));
* }</pre>
*
* <h3>Configurations of {@link ReadSourceDescriptors}</h3>
*
* <p>Except configurations of Kafka Consumer, there are some other configurations which are related
* to processing records.
*
* <p>{@link ReadSourceDescriptors#commitOffsets()} enables committing offset after processing the
* record. Note that if the {@code isolation.level} is set to "read_committed" or {@link
* ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, the {@link
* ReadSourceDescriptors#commitOffsets()} will be ignored.
*
* <p>{@link ReadSourceDescriptors#withExtractOutputTimestampFn(SerializableFunction)} is used to
* compute the {@code output timestamp} for a given {@link KafkaRecord} and controls the watermark
* advancement. There are three built-in types:
*
* <ul>
* <li>{@link ReadSourceDescriptors#withProcessingTime()}
* <li>{@link ReadSourceDescriptors#withCreateTime()}
* <li>{@link ReadSourceDescriptors#withLogAppendTime()}
* </ul>
*
* <p>For example, to create a {@link ReadSourceDescriptors} with this additional configuration:
*
* <pre>{@code
* pipeline
* .apply(Create.of(
* KafkaSourceDescriptor.of(
* new TopicPartition("topic", 1),
* null,
* null,
* ImmutableList.of("broker_1:9092", "broker_2:9092"))
* .apply(KafkaIO.readAll()
* .withKeyDeserializer(LongDeserializer.class).
* .withValueDeserializer(StringDeserializer.class)
* .withProcessingTime()
* .commitOffsets());
* }</pre>
*
* <h3>Writing to Kafka</h3>
*
* <p>KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write just the
* values or native Kafka producer records using {@link
* org.apache.kafka.clients.producer.ProducerRecord}. To configure a Kafka sink, you must specify at
* the minimum Kafka <tt>bootstrapServers</tt>, the topic to write to, and key and value
* serializers. For example:
*
* <pre>{@code
* PCollection<KV<Long, String>> kvColl = ...;
* kvColl.apply(KafkaIO.<Long, String>write()
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withTopic("results")
*
* .withKeySerializer(LongSerializer.class)
* .withValueSerializer(StringSerializer.class)
*
* // You can further customize KafkaProducer used to write the records by adding more
* // settings for ProducerConfig. e.g, to enable compression :
* .withProducerConfigUpdates(ImmutableMap.of("compression.type", "gzip"))
*
* // You set publish timestamp for the Kafka records.
* .withInputTimestamp() // element timestamp is used while publishing to Kafka
* // or you can also set a custom timestamp with a function.
* .withPublishTimestampFunction((elem, elemTs) -> ...)
*
* // Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS().
* .withEOS(20, "eos-sink-group-id");
* );
* }</pre>
*
* <p>Often you might want to write just values without any keys to Kafka. Use {@code values()} to
* write records with default empty(null) key:
*
* <pre>{@code
* PCollection<String> strings = ...;
* strings.apply(KafkaIO.<Void, String>write()
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withTopic("results")
* .withValueSerializer(StringSerializer.class) // just need serializer for value
* .values()
* );
* }</pre>
*
* <p>Also, if you want to write Kafka {@link ProducerRecord} then you should use {@link
* KafkaIO#writeRecords()}:
*
* <pre>{@code
* PCollection<ProducerRecord<Long, String>> records = ...;
* records.apply(KafkaIO.<Long, String>writeRecords()
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withTopic("results")
* .withKeySerializer(LongSerializer.class)
* .withValueSerializer(StringSerializer.class)
* );
* }</pre>
*
* <h3>Advanced Kafka Configuration</h3>
*
* KafkaIO allows setting most of the properties in {@link ConsumerConfig} for source or in {@link
* ProducerConfig} for sink. E.g. if you would like to enable offset <em>auto commit</em> (for
* external monitoring or other purposes), you can set <tt>"group.id"</tt>,
* <tt>"enable.auto.commit"</tt>, etc.
*
* <h3>Event Timestamps and Watermark</h3>
*
* By default, record timestamp (event time) is set to processing time in KafkaIO reader and source
* watermark is current wall time. If a topic has Kafka server-side ingestion timestamp enabled
* ('LogAppendTime'), it can enabled with {@link Read#withLogAppendTime()}. A custom timestamp
* policy can be provided by implementing {@link TimestampPolicyFactory}. See {@link
* Read#withTimestampPolicyFactory(TimestampPolicyFactory)} for more information.
*
* <h3>Supported Kafka Client Versions</h3>
*
* KafkaIO relies on <i>kafka-clients</i> for all its interactions with the Kafka cluster.
* <i>kafka-clients</i> versions 0.10.1 and newer are supported at runtime. The older versions 0.9.x
* - 0.10.0.0 are also supported, but are deprecated and likely be removed in near future. Please
* ensure that the version included with the application is compatible with the version of your
* Kafka cluster. Kafka client usually fails to initialize with a clear error message in case of
* incompatibility.
*/
@Experimental(Kind.SOURCE_SINK)
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class KafkaIO {
/**
* A specific instance of uninitialized {@link #read()} where key and values are bytes. See
* #read().
*/
public static Read<byte[], byte[]> readBytes() {
return KafkaIO.<byte[], byte[]>read()
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class);
}
/**
* Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka configuration
* should set with {@link Read#withBootstrapServers(String)} and {@link Read#withTopics(List)}.
* Other optional settings include key and value {@link Deserializer}s, custom timestamp,
* watermark functions.
*/
public static <K, V> Read<K, V> read() {
return new AutoValue_KafkaIO_Read.Builder<K, V>()
.setTopics(new ArrayList<>())
.setTopicPartitions(new ArrayList<>())
.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
.setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
.setMaxNumRecords(Long.MAX_VALUE)
.setCommitOffsetsInFinalizeEnabled(false)
.setDynamicRead(false)
.setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())
.build();
}
/**
* Creates an uninitialized {@link ReadSourceDescriptors} {@link PTransform}. Different from
* {@link Read}, setting up {@code topics} and {@code bootstrapServers} is not required during
* construction time. But the {@code bootstrapServers} still can be configured {@link
* ReadSourceDescriptors#withBootstrapServers(String)}. Please refer to {@link
* ReadSourceDescriptors} for more details.
*/
public static <K, V> ReadSourceDescriptors<K, V> readSourceDescriptors() {
return ReadSourceDescriptors.<K, V>read();
}
/**
* Creates an uninitialized {@link Write} {@link PTransform}. Before use, Kafka configuration
* should be set with {@link Write#withBootstrapServers(String)} and {@link Write#withTopic} along
* with {@link Deserializer}s for (optional) key and values.
*/
public static <K, V> Write<K, V> write() {
return new AutoValue_KafkaIO_Write.Builder<K, V>()
.setWriteRecordsTransform(
new AutoValue_KafkaIO_WriteRecords.Builder<K, V>()
.setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES)
.setEOS(false)
.setNumShards(0)
.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
.build())
.build();
}
/**
* Creates an uninitialized {@link WriteRecords} {@link PTransform}. Before use, Kafka
* configuration should be set with {@link WriteRecords#withBootstrapServers(String)} and {@link
* WriteRecords#withTopic} along with {@link Deserializer}s for (optional) key and values.
*/
public static <K, V> WriteRecords<K, V> writeRecords() {
return new AutoValue_KafkaIO_WriteRecords.Builder<K, V>()
.setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES)
.setEOS(false)
.setNumShards(0)
.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
.build();
}
///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
/**
* A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more information on
* usage and configuration.
*/
@AutoValue
@AutoValue.CopyAnnotations
@SuppressWarnings({"rawtypes"})
public abstract static class Read<K, V>
extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
abstract Map<String, Object> getConsumerConfig();
abstract @Nullable List<String> getTopics();
abstract @Nullable List<TopicPartition> getTopicPartitions();
abstract @Nullable Coder<K> getKeyCoder();
abstract @Nullable Coder<V> getValueCoder();
abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
getConsumerFactoryFn();
abstract @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> getWatermarkFn();
abstract long getMaxNumRecords();
abstract @Nullable Duration getMaxReadTime();
abstract @Nullable Instant getStartReadTime();
abstract boolean isCommitOffsetsInFinalizeEnabled();
abstract boolean isDynamicRead();
abstract @Nullable Duration getWatchTopicPartitionDuration();
abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
abstract @Nullable Map<String, Object> getOffsetConsumerConfig();
abstract @Nullable DeserializerProvider getKeyDeserializerProvider();
abstract @Nullable DeserializerProvider getValueDeserializerProvider();
abstract @Nullable SerializableFunction<TopicPartition, Boolean> getCheckStopReadingFn();
abstract Builder<K, V> toBuilder();
@Experimental(Kind.PORTABILITY)
@AutoValue.Builder
abstract static class Builder<K, V>
implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
abstract Builder<K, V> setTopics(List<String> topics);
abstract Builder<K, V> setTopicPartitions(List<TopicPartition> topicPartitions);
abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
abstract Builder<K, V> setConsumerFactoryFn(
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
abstract Builder<K, V> setMaxNumRecords(long maxNumRecords);
abstract Builder<K, V> setMaxReadTime(Duration maxReadTime);
abstract Builder<K, V> setStartReadTime(Instant startReadTime);
abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize);
abstract Builder<K, V> setDynamicRead(boolean dynamicRead);
abstract Builder<K, V> setWatchTopicPartitionDuration(Duration duration);
abstract Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory);
abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> offsetConsumerConfig);
abstract Builder<K, V> setKeyDeserializerProvider(DeserializerProvider deserializerProvider);
abstract Builder<K, V> setValueDeserializerProvider(
DeserializerProvider deserializerProvider);
abstract Builder<K, V> setCheckStopReadingFn(
SerializableFunction<TopicPartition, Boolean> checkStopReadingFn);
abstract Read<K, V> build();
@Override
public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
External.Configuration config) {
ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
for (String topic : config.topics) {
listBuilder.add(topic);
}
setTopics(listBuilder.build());
Class keyDeserializer = resolveClass(config.keyDeserializer);
setKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
setKeyCoder(resolveCoder(keyDeserializer));
Class valueDeserializer = resolveClass(config.valueDeserializer);
setValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
setValueCoder(resolveCoder(valueDeserializer));
Map<String, Object> consumerConfig = new HashMap<>(config.consumerConfig);
// Key and Value Deserializers always have to be in the config.
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
consumerConfig.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());
setConsumerConfig(consumerConfig);
// Set required defaults
setTopicPartitions(Collections.emptyList());
setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN);
if (config.maxReadTime != null) {
setMaxReadTime(Duration.standardSeconds(config.maxReadTime));
}
setMaxNumRecords(config.maxNumRecords == null ? Long.MAX_VALUE : config.maxNumRecords);
// Set committing offset configuration.
setCommitOffsetsInFinalizeEnabled(config.commitOffsetInFinalize);
// Set timestamp policy with built-in types.
String timestampPolicy = config.timestampPolicy;
if (timestampPolicy.equals("ProcessingTime")) {
setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
} else if (timestampPolicy.equals("CreateTime")) {
setTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(Duration.ZERO));
} else if (timestampPolicy.equals("LogAppendTime")) {
setTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
} else {
throw new IllegalArgumentException(
"timestampPolicy should be one of (ProcessingTime, CreateTime, LogAppendTime)");
}
if (config.startReadTime != null) {
setStartReadTime(Instant.ofEpochMilli(config.startReadTime));
}
// We can expose dynamic read to external build when ReadFromKafkaDoFn is the default
// implementation.
setDynamicRead(false);
// We do not include Metadata until we can encode KafkaRecords cross-language
return build().withoutMetadata();
}
private static Coder resolveCoder(Class deserializer) {
for (Method method : deserializer.getDeclaredMethods()) {
if (method.getName().equals("deserialize")) {
Class<?> returnType = method.getReturnType();
if (returnType.equals(Object.class)) {
continue;
}
if (returnType.equals(byte[].class)) {
return ByteArrayCoder.of();
} else if (returnType.equals(Integer.class)) {
return VarIntCoder.of();
} else if (returnType.equals(Long.class)) {
return VarLongCoder.of();
} else {
throw new RuntimeException("Couldn't infer Coder from " + deserializer);
}
}
}
throw new RuntimeException("Couldn't resolve coder for Deserializer: " + deserializer);
}
}
/**
* Exposes {@link KafkaIO.TypedWithoutMetadata} as an external transform for cross-language
* usage.
*/
@Experimental(Kind.PORTABILITY)
@AutoService(ExternalTransformRegistrar.class)
public static class External implements ExternalTransformRegistrar {
public static final String URN = "beam:external:java:kafka:read:v1";
@Override
public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
return ImmutableMap.of(
URN,
(Class<? extends ExternalTransformBuilder<?, ?, ?>>)
(Class<?>) AutoValue_KafkaIO_Read.Builder.class);
}
/** Parameters class to expose the Read transform to an external SDK. */
public static class Configuration {
private Map<String, String> consumerConfig;
private List<String> topics;
private String keyDeserializer;
private String valueDeserializer;
private Long startReadTime;
private Long maxNumRecords;
private Long maxReadTime;
private Boolean commitOffsetInFinalize;
private String timestampPolicy;
public void setConsumerConfig(Map<String, String> consumerConfig) {
this.consumerConfig = consumerConfig;
}
public void setTopics(List<String> topics) {
this.topics = topics;
}
public void setKeyDeserializer(String keyDeserializer) {
this.keyDeserializer = keyDeserializer;
}
public void setValueDeserializer(String valueDeserializer) {
this.valueDeserializer = valueDeserializer;
}
public void setStartReadTime(Long startReadTime) {
this.startReadTime = startReadTime;
}
public void setMaxNumRecords(Long maxNumRecords) {
this.maxNumRecords = maxNumRecords;
}
public void setMaxReadTime(Long maxReadTime) {
this.maxReadTime = maxReadTime;
}
public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) {
this.commitOffsetInFinalize = commitOffsetInFinalize;
}
public void setTimestampPolicy(String timestampPolicy) {
this.timestampPolicy = timestampPolicy;
}
}
}
/** Sets the bootstrap servers for the Kafka consumer. */
public Read<K, V> withBootstrapServers(String bootstrapServers) {
return withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
}
/**
* Sets the topic to read from.
*
* <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
* partitions are distributed among the splits.
*/
public Read<K, V> withTopic(String topic) {
return withTopics(ImmutableList.of(topic));
}
/**
* Sets a list of topics to read from. All the partitions from each of the topics are read.
*
* <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
* partitions are distributed among the splits.
*/
public Read<K, V> withTopics(List<String> topics) {
checkState(
getTopicPartitions() == null || getTopicPartitions().isEmpty(),
"Only topics or topicPartitions can be set, not both");
return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
}
/**
* Sets a list of partitions to read from. This allows reading only a subset of partitions for
* one or more topics when (if ever) needed.
*
* <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
* partitions are distributed among the splits.
*/
public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
checkState(
getTopics() == null || getTopics().isEmpty(),
"Only topics or topicPartitions can be set, not both");
return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
}
/**
* Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka.
*
* <p>In addition, Beam also needs a {@link Coder} to serialize and deserialize key objects at
* runtime. KafkaIO tries to infer a coder for the key based on the {@link Deserializer} class,
* however in case that fails, you can use {@link #withKeyDeserializerAndCoder(Class, Coder)} to
* provide the key coder explicitly.
*/
public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) {
return withKeyDeserializer(LocalDeserializerProvider.of(keyDeserializer));
}
/**
* Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a
* {@link Coder} for helping the Beam runner materialize key objects at runtime if necessary.
*
* <p>Use this method only if your pipeline doesn't work with plain {@link
* #withKeyDeserializer(Class)}.
*/
public Read<K, V> withKeyDeserializerAndCoder(
Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
}
public Read<K, V> withKeyDeserializer(DeserializerProvider<K> deserializerProvider) {
return toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
}
/**
* Sets a Kafka {@link Deserializer} to interpret value bytes read from Kafka.
*
* <p>In addition, Beam also needs a {@link Coder} to serialize and deserialize value objects at
* runtime. KafkaIO tries to infer a coder for the value based on the {@link Deserializer}
* class, however in case that fails, you can use {@link #withValueDeserializerAndCoder(Class,
* Coder)} to provide the value coder explicitly.
*/
public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) {
return withValueDeserializer(LocalDeserializerProvider.of(valueDeserializer));
}
/**
* Sets a Kafka {@link Deserializer} for interpreting value bytes read from Kafka along with a
* {@link Coder} for helping the Beam runner materialize value objects at runtime if necessary.
*
* <p>Use this method only if your pipeline doesn't work with plain {@link
* #withValueDeserializer(Class)}.
*/
public Read<K, V> withValueDeserializerAndCoder(
Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) {
return withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
}
public Read<K, V> withValueDeserializer(DeserializerProvider<V> deserializerProvider) {
return toBuilder().setValueDeserializerProvider(deserializerProvider).build();
}
/**
* A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for
* supporting another version of Kafka consumer. Default is {@link KafkaConsumer}.
*/
public Read<K, V> withConsumerFactoryFn(
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
}
/**
* Update consumer configuration with new properties.
*
* @deprecated as of version 2.13. Use {@link #withConsumerConfigUpdates(Map)} instead
*/
@Deprecated
public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {
Map<String, Object> config =
KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), configUpdates);
return toBuilder().setConsumerConfig(config).build();
}
/**
* Similar to {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxNumRecords(long)}. Mainly used
* for tests and demo applications.
*/
public Read<K, V> withMaxNumRecords(long maxNumRecords) {
return toBuilder().setMaxNumRecords(maxNumRecords).build();
}
/**
* Use timestamp to set up start offset. It is only supported by Kafka Client 0.10.1.0 onwards
* and the message format version after 0.10.0.
*
* <p>Note that this take priority over start offset configuration {@code
* ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} and any auto committed offsets.
*
* <p>This results in hard failures in either of the following two cases : 1. If one of more
* partitions do not contain any messages with timestamp larger than or equal to desired
* timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the
* messages do not have timestamps.
*/
public Read<K, V> withStartReadTime(Instant startReadTime) {
return toBuilder().setStartReadTime(startReadTime).build();
}
/**
* Similar to {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}. Mainly
* used for tests and demo applications.
*/
public Read<K, V> withMaxReadTime(Duration maxReadTime) {
return toBuilder().setMaxReadTime(maxReadTime).build();
}
/**
* Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.LogAppendTimePolicy}. The
* policy assigns Kafka's log append time (server side ingestion time) to each record. The
* watermark for each Kafka partition is the timestamp of the last record read. If a partition
* is idle, the watermark advances to couple of seconds behind wall time. Every record consumed
* from Kafka is expected to have its timestamp type set to 'LOG_APPEND_TIME'.
*
* <p>In Kafka, log append time needs to be enabled for each topic, and all the subsequent
* records wil have their timestamp set to log append time. If a record does not have its
* timestamp type set to 'LOG_APPEND_TIME' for any reason, it's timestamp is set to previous
* record timestamp or latest watermark, whichever is larger.
*
* <p>The watermark for the entire source is the oldest of each partition's watermark. If one of
* the readers falls behind possibly due to uneven distribution of records among Kafka
* partitions, it ends up holding the watermark for the entire source.
*/
public Read<K, V> withLogAppendTime() {
return withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());
}
/**
* Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.ProcessingTimePolicy}. This is
* the default timestamp policy. It assigns processing time to each record. Specifically, this
* is the timestamp when the record becomes 'current' in the reader. The watermark aways
* advances to current time. If server side time (log append time) is enabled in Kafka, {@link
* #withLogAppendTime()} is recommended over this.
*/
public Read<K, V> withProcessingTime() {
return withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());
}
/**
* Sets the timestamps policy based on {@link KafkaTimestampType#CREATE_TIME} timestamp of the
* records. It is an error if a record's timestamp type is not {@link
* KafkaTimestampType#CREATE_TIME}. The timestamps within a partition are expected to be roughly
* monotonically increasing with a cap on out of order delays (e.g. 'max delay' of 1 minute).
* The watermark at any time is '({@code Min(now(), Max(event timestamp so far)) - max delay})'.
* However, watermark is never set in future and capped to 'now - max delay'. In addition,
* watermark advanced to 'now - max delay' when a partition is idle.
*
* @param maxDelay For any record in the Kafka partition, the timestamp of any subsequent record
* is expected to be after {@code current record timestamp - maxDelay}.
*/
public Read<K, V> withCreateTime(Duration maxDelay) {
return withTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(maxDelay));
}
/**
* Provide custom {@link TimestampPolicyFactory} to set event times and watermark for each
* partition. {@link TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)} is
* invoked for each partition when the reader starts.
*
* @see #withLogAppendTime()
* @see #withCreateTime(Duration)
* @see #withProcessingTime()
*/
public Read<K, V> withTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory) {
return toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build();
}
/**
* A function to assign a timestamp to a record. Default is processing timestamp.
*
* @deprecated as of version 2.4. Use {@link
* #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
*/
@Deprecated
public Read<K, V> withTimestampFn2(
SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
checkArgument(timestampFn != null, "timestampFn can not be null");
return toBuilder()
.setTimestampPolicyFactory(TimestampPolicyFactory.withTimestampFn(timestampFn))
.build();
}
/**
* A function to calculate watermark after a record. Default is last record timestamp.
*
* @see #withTimestampFn(SerializableFunction)
* @deprecated as of version 2.4. Use {@link
* #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
*/
@Deprecated
public Read<K, V> withWatermarkFn2(
SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
checkArgument(watermarkFn != null, "watermarkFn can not be null");
return toBuilder().setWatermarkFn(watermarkFn).build();
}
/**
* A function to assign a timestamp to a record. Default is processing timestamp.
*
* @deprecated as of version 2.4. Use {@link
* #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
*/
@Deprecated
public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {
checkArgument(timestampFn != null, "timestampFn can not be null");
return withTimestampFn2(unwrapKafkaAndThen(timestampFn));
}
/**
* A function to calculate watermark after a record. Default is last record timestamp.
*
* @see #withTimestampFn(SerializableFunction)
* @deprecated as of version 2.4. Use {@link
* #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.
*/
@Deprecated
public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {
checkArgument(watermarkFn != null, "watermarkFn can not be null");
return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
}
/**
* Sets "isolation_level" to "read_committed" in Kafka consumer configuration. This is ensures
* that the consumer does not read uncommitted messages. Kafka version 0.11 introduced
* transactional writes. Applications requiring end-to-end exactly-once semantics should only
* read committed messages. See JavaDoc for {@link KafkaConsumer} for more description.
*/
public Read<K, V> withReadCommitted() {
return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", "read_committed"));
}
/**
* Finalized offsets are committed to Kafka. See {@link CheckpointMark#finalizeCheckpoint()}. It
* helps with minimizing gaps or duplicate processing of records while restarting a pipeline
* from scratch. But it does not provide hard processing guarantees. There could be a short
* delay to commit after {@link CheckpointMark#finalizeCheckpoint()} is invoked, as reader might
* be blocked on reading from Kafka. Note that it is independent of 'AUTO_COMMIT' Kafka consumer
* configuration. Usually either this or AUTO_COMMIT in Kafka consumer is enabled, but not both.
*/
public Read<K, V> commitOffsetsInFinalize() {
return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
}
/**
* Configure the KafkaIO to use {@link WatchKafkaTopicPartitionDoFn} to detect and emit any new
* available {@link TopicPartition} for {@link ReadFromKafkaDoFn} to consume during pipeline
* execution time. The KafkaIO will regularly check the availability based on the given
* duration. If the duration is not specified as {@code null}, the default duration is 1 hour.
*/
public Read<K, V> withDynamicRead(Duration duration) {
return toBuilder().setDynamicRead(true).setWatchTopicPartitionDuration(duration).build();
}
/**
* Set additional configuration for the backend offset consumer. It may be required for a
* secured Kafka cluster, especially when you see similar WARN log message 'exception while
* fetching latest offset for partition {}. will be retried'.
*
* <p>In {@link KafkaIO#read()}, there're two consumers running in the backend actually:<br>
* 1. the main consumer, which reads data from kafka;<br>
* 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest
* offset;<br>
*
* <p>By default, offset consumer inherits the configuration from main consumer, with an
* auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not work in a secured Kafka
* which requires more configurations.
*/
public Read<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> offsetConsumerConfig) {
return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
}
/**
* Update configuration for the backend main consumer. Note that the default consumer properties
* will not be completely overridden. This method only updates the value which has the same key.
*
* <p>In {@link KafkaIO#read()}, there're two consumers running in the backend actually:<br>
* 1. the main consumer, which reads data from kafka;<br>
* 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest
* offset;<br>
*
* <p>By default, main consumer uses the configuration from {@link
* KafkaIOUtils#DEFAULT_CONSUMER_PROPERTIES}.
*/
public Read<K, V> withConsumerConfigUpdates(Map<String, Object> configUpdates) {
Map<String, Object> config =
KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), configUpdates);
return toBuilder().setConsumerConfig(config).build();
}
/**
* A custom {@link SerializableFunction} that determines whether the {@link ReadFromKafkaDoFn}
* should stop reading from the given {@link TopicPartition}.
*/
public Read<K, V> withCheckStopReadingFn(
SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) {
return toBuilder().setCheckStopReadingFn(checkStopReadingFn).build();
}
/** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */
public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
return new TypedWithoutMetadata<>(this);
}
@Override
public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
checkArgument(
getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
"withBootstrapServers() is required");
// With dynamic read, we no longer require providing topic/partition during pipeline
// construction time. But it requires enabling beam_fn_api.
if (!isDynamicRead()) {
checkArgument(
(getTopics() != null && getTopics().size() > 0)
|| (getTopicPartitions() != null && getTopicPartitions().size() > 0),
"Either withTopic(), withTopics() or withTopicPartitions() is required");
} else {
checkArgument(
ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api"),
"Kafka Dynamic Read requires enabling experiment beam_fn_api.");
}
checkArgument(getKeyDeserializerProvider() != null, "withKeyDeserializer() is required");
checkArgument(getValueDeserializerProvider() != null, "withValueDeserializer() is required");
if (!ConsumerSpEL.hasOffsetsForTimes()) {
LOG.warn(
"Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and "
+ "may not be supported in next release of Apache Beam. "
+ "Please upgrade your Kafka client version.",
AppInfoParser.getVersion());
}
if (getStartReadTime() != null) {
checkArgument(
ConsumerSpEL.hasOffsetsForTimes(),
"Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, "
+ "current version of Kafka Client is "
+ AppInfoParser.getVersion()
+ ". If you are building with maven, set \"kafka.clients.version\" "
+ "maven property to 0.10.1.0 or newer.");
}
if (isCommitOffsetsInFinalizeEnabled()) {
checkArgument(
getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG) != null,
"commitOffsetsInFinalize() is enabled, but group.id in Kafka consumer config "
+ "is not set. Offset management requires group.id.");
if (Boolean.TRUE.equals(
getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))) {
LOG.warn(
"'{}' in consumer config is enabled even though commitOffsetsInFinalize() "
+ "is set. You need only one of them.",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
}
}
// Infer key/value coders if not specified explicitly
CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
Coder<K> keyCoder = getKeyCoder(coderRegistry);
Coder<V> valueCoder = getValueCoder(coderRegistry);
// For read from unbounded in a bounded manner, we actually are not going through Read or SDF.
if (ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
|| ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(), "use_deprecated_read")
|| getMaxNumRecords() < Long.MAX_VALUE
|| getMaxReadTime() != null) {
return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
}
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
}
/**
* A {@link PTransformOverride} for runners to swap {@link ReadFromKafkaViaSDF} to legacy Kafka
* read if runners doesn't have a good support on executing unbounded Splittable DoFn.
*/
@Internal
public static final PTransformOverride KAFKA_READ_OVERRIDE =
PTransformOverride.of(
PTransformMatchers.classEqualTo(ReadFromKafkaViaSDF.class),
new KafkaReadOverrideFactory<>());
private static class KafkaReadOverrideFactory<K, V>
implements PTransformOverrideFactory<
PBegin, PCollection<KafkaRecord<K, V>>, ReadFromKafkaViaSDF<K, V>> {
@Override
public PTransformReplacement<PBegin, PCollection<KafkaRecord<K, V>>> getReplacementTransform(
AppliedPTransform<PBegin, PCollection<KafkaRecord<K, V>>, ReadFromKafkaViaSDF<K, V>>
transform) {
return PTransformReplacement.of(
transform.getPipeline().begin(),
new ReadFromKafkaViaUnbounded<>(
transform.getTransform().kafkaRead,
transform.getTransform().keyCoder,
transform.getTransform().valueCoder));
}
@Override
public Map<PCollection<?>, ReplacementOutput> mapOutputs(
Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KafkaRecord<K, V>> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}
private static class ReadFromKafkaViaUnbounded<K, V>
extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
Read<K, V> kafkaRead;
Coder<K> keyCoder;
Coder<V> valueCoder;
ReadFromKafkaViaUnbounded(Read<K, V> kafkaRead, Coder<K> keyCoder, Coder<V> valueCoder) {
this.kafkaRead = kafkaRead;
this.keyCoder = keyCoder;
this.valueCoder = valueCoder;
}
@Override
public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
// Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
Unbounded<KafkaRecord<K, V>> unbounded =
org.apache.beam.sdk.io.Read.from(
kafkaRead
.toBuilder()
.setKeyCoder(keyCoder)
.setValueCoder(valueCoder)
.build()
.makeSource());
PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded;
if (kafkaRead.getMaxNumRecords() < Long.MAX_VALUE || kafkaRead.getMaxReadTime() != null) {
transform =
unbounded
.withMaxReadTime(kafkaRead.getMaxReadTime())
.withMaxNumRecords(kafkaRead.getMaxNumRecords());
}
return input.getPipeline().apply(transform);
}
}
static class ReadFromKafkaViaSDF<K, V>
extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
Read<K, V> kafkaRead;
Coder<K> keyCoder;
Coder<V> valueCoder;
ReadFromKafkaViaSDF(Read<K, V> kafkaRead, Coder<K> keyCoder, Coder<V> valueCoder) {
this.kafkaRead = kafkaRead;
this.keyCoder = keyCoder;
this.valueCoder = valueCoder;
}
@Override
public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
ReadSourceDescriptors<K, V> readTransform =
ReadSourceDescriptors.<K, V>read()
.withConsumerConfigOverrides(kafkaRead.getConsumerConfig())
.withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig())
.withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn())
.withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider())
.withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider())
.withManualWatermarkEstimator()
.withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory())
.withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn());
if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
readTransform = readTransform.commitOffsets();
}
PCollection<KafkaSourceDescriptor> output;
if (kafkaRead.isDynamicRead()) {
Set<String> topics = new HashSet<>();
if (kafkaRead.getTopics() != null && kafkaRead.getTopics().size() > 0) {
topics.addAll(kafkaRead.getTopics());
}
if (kafkaRead.getTopicPartitions() != null && kafkaRead.getTopicPartitions().size() > 0) {
for (TopicPartition topicPartition : kafkaRead.getTopicPartitions()) {
topics.add(topicPartition.topic());
}
}
output =
input
.getPipeline()
.apply(Impulse.create())
.apply(
MapElements.into(
TypeDescriptors.kvs(
new TypeDescriptor<byte[]>() {}, new TypeDescriptor<byte[]>() {}))
.via(element -> KV.of(element, element)))
.apply(
ParDo.of(
new WatchKafkaTopicPartitionDoFn(
kafkaRead.getWatchTopicPartitionDuration(),
kafkaRead.getConsumerFactoryFn(),
kafkaRead.getCheckStopReadingFn(),
kafkaRead.getConsumerConfig(),
kafkaRead.getStartReadTime(),
topics.stream().collect(Collectors.toList()))));
} else {
output =
input
.getPipeline()
.apply(Impulse.create())
.apply(ParDo.of(new GenerateKafkaSourceDescriptor(kafkaRead)));
}
return output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder, valueCoder));
}
}
/**
* A DoFn which generates {@link KafkaSourceDescriptor} based on the configuration of {@link
* Read}.
*/
@VisibleForTesting
static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescriptor> {
GenerateKafkaSourceDescriptor(Read read) {
this.consumerConfig = read.getConsumerConfig();
this.consumerFactoryFn = read.getConsumerFactoryFn();
this.topics = read.getTopics();
this.topicPartitions = read.getTopicPartitions();
this.startReadTime = read.getStartReadTime();
}
private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
consumerFactoryFn;
private final List<TopicPartition> topicPartitions;
private final Instant startReadTime;
@VisibleForTesting final Map<String, Object> consumerConfig;
@VisibleForTesting final List<String> topics;
@ProcessElement
public void processElement(OutputReceiver<KafkaSourceDescriptor> receiver) {
List<TopicPartition> partitions = new ArrayList<>(topicPartitions);
if (partitions.isEmpty()) {
try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
for (String topic : topics) {
for (PartitionInfo p : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
}
}
}
for (TopicPartition topicPartition : partitions) {
receiver.output(KafkaSourceDescriptor.of(topicPartition, null, startReadTime, null));
}
}
}
private Coder<K> getKeyCoder(CoderRegistry coderRegistry) {
return (getKeyCoder() != null)
? getKeyCoder()
: getKeyDeserializerProvider().getCoder(coderRegistry);
}
private Coder<V> getValueCoder(CoderRegistry coderRegistry) {
return (getValueCoder() != null)
? getValueCoder()
: getValueDeserializerProvider().getCoder(coderRegistry);
}
/**
* Creates an {@link UnboundedSource UnboundedSource&lt;KafkaRecord&lt;K, V&gt;, ?&gt;} with the
* configuration in {@link Read}. Primary use case is unit tests, should not be used in an
* application.
*/
@VisibleForTesting
UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
return new KafkaUnboundedSource<>(this, -1);
}
// utility method to convert KafkaRecord<K, V> to user KV<K, V> before applying user functions
private static <KeyT, ValueT, OutT>
SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> unwrapKafkaAndThen(
final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
return record -> fn.apply(record.getKV());
}
@SuppressWarnings("unchecked")
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
List<String> topics = getTopics();
List<TopicPartition> topicPartitions = getTopicPartitions();
if (topics.size() > 0) {
builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s"));
} else if (topicPartitions.size() > 0) {
builder.add(
DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions))
.withLabel("Topic Partition/s"));
}
Set<String> disallowedConsumerPropertiesKeys =
KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.keySet();
for (Map.Entry<String, Object> conf : getConsumerConfig().entrySet()) {
String key = conf.getKey();
if (!disallowedConsumerPropertiesKeys.contains(key)) {
Object value =
DisplayData.inferType(conf.getValue()) != null
? conf.getValue()
: String.valueOf(conf.getValue());
builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value)));
}
}
}
}
/**
* A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.Read}, but removes
* Kafka metatdata and returns a {@link PCollection} of {@link KV}. See {@link KafkaIO} for more
* information on usage and configuration of reader.
*/
public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
private final Read<K, V> read;
TypedWithoutMetadata(Read<K, V> read) {
super("KafkaIO.Read");
this.read = read;
}
@Override
public PCollection<KV<K, V>> expand(PBegin begin) {
return begin
.apply(read)
.apply(
"Remove Kafka Metadata",
ParDo.of(
new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
ctx.output(ctx.element().getKV());
}
}));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
read.populateDisplayData(builder);
}
}
/**
* A {@link PTransform} to read from {@link KafkaSourceDescriptor}. See {@link KafkaIO} for more
* information on usage and configuration. See {@link ReadFromKafkaDoFn} for more implementation
* details.
*
* <p>During expansion, if {@link ReadSourceDescriptors#isCommitOffsetEnabled()} is {@code true},
* the transform will expand to:
*
* <pre>{@code
* PCollection<KafkaSourceDescriptor> --> ParDo(ReadFromKafkaDoFn<KafkaSourceDescriptor, KV<KafkaSourceDescriptor, KafkaRecord>>) --> Reshuffle() --> Map(output KafkaRecord)
* |
* --> KafkaCommitOffset
* }</pre>
*
* . Note that this expansion is not supported when running with x-lang on Dataflow.
*/
@Experimental(Kind.PORTABILITY)
@AutoValue
@AutoValue.CopyAnnotations
@SuppressWarnings({"rawtypes"})
public abstract static class ReadSourceDescriptors<K, V>
extends PTransform<PCollection<KafkaSourceDescriptor>, PCollection<KafkaRecord<K, V>>> {
private static final Logger LOG = LoggerFactory.getLogger(ReadSourceDescriptors.class);
abstract Map<String, Object> getConsumerConfig();
abstract @Nullable Map<String, Object> getOffsetConsumerConfig();
abstract @Nullable DeserializerProvider getKeyDeserializerProvider();
abstract @Nullable DeserializerProvider getValueDeserializerProvider();
abstract @Nullable Coder<K> getKeyCoder();
abstract @Nullable Coder<V> getValueCoder();
abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
getConsumerFactoryFn();
abstract @Nullable SerializableFunction<TopicPartition, Boolean> getCheckStopReadingFn();
abstract @Nullable SerializableFunction<KafkaRecord<K, V>, Instant>
getExtractOutputTimestampFn();
abstract @Nullable SerializableFunction<Instant, WatermarkEstimator<Instant>>
getCreateWatermarkEstimatorFn();
abstract boolean isCommitOffsetEnabled();
abstract @Nullable TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
abstract ReadSourceDescriptors.Builder<K, V> toBuilder();
@AutoValue.Builder
abstract static class Builder<K, V> {
abstract ReadSourceDescriptors.Builder<K, V> setConsumerConfig(Map<String, Object> config);
abstract ReadSourceDescriptors.Builder<K, V> setOffsetConsumerConfig(
Map<String, Object> offsetConsumerConfig);
abstract ReadSourceDescriptors.Builder<K, V> setConsumerFactoryFn(
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
abstract ReadSourceDescriptors.Builder<K, V> setCheckStopReadingFn(
SerializableFunction<TopicPartition, Boolean> checkStopReadingFn);
abstract ReadSourceDescriptors.Builder<K, V> setKeyDeserializerProvider(
DeserializerProvider deserializerProvider);
abstract ReadSourceDescriptors.Builder<K, V> setValueDeserializerProvider(
DeserializerProvider deserializerProvider);
abstract ReadSourceDescriptors.Builder<K, V> setKeyCoder(Coder<K> keyCoder);
abstract ReadSourceDescriptors.Builder<K, V> setValueCoder(Coder<V> valueCoder);
abstract ReadSourceDescriptors.Builder<K, V> setExtractOutputTimestampFn(
SerializableFunction<KafkaRecord<K, V>, Instant> fn);
abstract ReadSourceDescriptors.Builder<K, V> setCreateWatermarkEstimatorFn(
SerializableFunction<Instant, WatermarkEstimator<Instant>> fn);
abstract ReadSourceDescriptors.Builder<K, V> setCommitOffsetEnabled(
boolean commitOffsetEnabled);
abstract ReadSourceDescriptors.Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> policy);
abstract ReadSourceDescriptors<K, V> build();
}
public static <K, V> ReadSourceDescriptors<K, V> read() {
return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder<K, V>()
.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
.setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
.setCommitOffsetEnabled(false)
.build()
.withProcessingTime()
.withMonotonicallyIncreasingWatermarkEstimator();
}
/**
* Sets the bootstrap servers to use for the Kafka consumer if unspecified via
* KafkaSourceDescriptor#getBootStrapServers()}.
*/
public ReadSourceDescriptors<K, V> withBootstrapServers(String bootstrapServers) {
return withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
}
public ReadSourceDescriptors<K, V> withKeyDeserializerProvider(
DeserializerProvider<K> deserializerProvider) {
return toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
}
public ReadSourceDescriptors<K, V> withValueDeserializerProvider(
DeserializerProvider<V> deserializerProvider) {
return toBuilder().setValueDeserializerProvider(deserializerProvider).build();
}
/**
* Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka.
*
* <p>In addition, Beam also needs a {@link Coder} to serialize and deserialize key objects at
* runtime. KafkaIO tries to infer a coder for the key based on the {@link Deserializer} class,
* however in case that fails, you can use {@link #withKeyDeserializerAndCoder(Class, Coder)} to
* provide the key coder explicitly.
*/
public ReadSourceDescriptors<K, V> withKeyDeserializer(
Class<? extends Deserializer<K>> keyDeserializer) {
return withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
}
/**
* Sets a Kafka {@link Deserializer} to interpret value bytes read from Kafka.
*
* <p>In addition, Beam also needs a {@link Coder} to serialize and deserialize value objects at
* runtime. KafkaIO tries to infer a coder for the value based on the {@link Deserializer}
* class, however in case that fails, you can use {@link #withValueDeserializerAndCoder(Class,
* Coder)} to provide the value coder explicitly.
*/
public ReadSourceDescriptors<K, V> withValueDeserializer(
Class<? extends Deserializer<V>> valueDeserializer) {
return withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
}
/**
* Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a
* {@link Coder} for helping the Beam runner materialize key objects at runtime if necessary.
*
* <p>Use this method to override the coder inference performed within {@link
* #withKeyDeserializer(Class)}.
*/
public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(
Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
}
/**
* Sets a Kafka {@link Deserializer} for interpreting value bytes read from Kafka along with a
* {@link Coder} for helping the Beam runner materialize value objects at runtime if necessary.
*
* <p>Use this method to override the coder inference performed within {@link
* #withValueDeserializer(Class)}.
*/
public ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(
Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) {
return withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
}
/**
* A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for
* supporting another version of Kafka consumer. Default is {@link KafkaConsumer}.
*/
public ReadSourceDescriptors<K, V> withConsumerFactoryFn(
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
}
/**
* A custom {@link SerializableFunction} that determines whether the {@link ReadFromKafkaDoFn}
* should stop reading from the given {@link TopicPartition}.
*/
public ReadSourceDescriptors<K, V> withCheckStopReadingFn(
SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) {
return toBuilder().setCheckStopReadingFn(checkStopReadingFn).build();
}
/**
* Updates configuration for the main consumer. This method merges updates from the provided map
* with any prior updates using {@link KafkaIOUtils#DEFAULT_CONSUMER_PROPERTIES} as the starting
* configuration.
*
* <p>In {@link ReadFromKafkaDoFn}, there're two consumers running in the backend:
*
* <ol>
* <li>the main consumer which reads data from kafka.
* <li>the secondary offset consumer which is used to estimate the backlog by fetching the
* latest offset.
* </ol>
*
* <p>See {@link #withConsumerConfigOverrides} for overriding the configuration instead of
* updating it.
*
* <p>See {@link #withOffsetConsumerConfigOverrides} for configuring the secondary offset
* consumer.
*/
public ReadSourceDescriptors<K, V> withConsumerConfigUpdates(
Map<String, Object> configUpdates) {
Map<String, Object> config =
KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), configUpdates);
return toBuilder().setConsumerConfig(config).build();
}
/**
* A function to calculate output timestamp for a given {@link KafkaRecord}. The default value
* is {@link #withProcessingTime()}.
*/
public ReadSourceDescriptors<K, V> withExtractOutputTimestampFn(
SerializableFunction<KafkaRecord<K, V>, Instant> fn) {
return toBuilder().setExtractOutputTimestampFn(fn).build();
}
/**
* A function to create a {@link WatermarkEstimator}. The default value is {@link
* MonotonicallyIncreasing}.
*/
public ReadSourceDescriptors<K, V> withCreatWatermarkEstimatorFn(
SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) {
return toBuilder().setCreateWatermarkEstimatorFn(fn).build();
}
/** Use the log append time as the output timestamp. */
public ReadSourceDescriptors<K, V> withLogAppendTime() {
return withExtractOutputTimestampFn(
ReadSourceDescriptors.ExtractOutputTimestampFns.useLogAppendTime());
}
/** Use the processing time as the output timestamp. */
public ReadSourceDescriptors<K, V> withProcessingTime() {
return withExtractOutputTimestampFn(
ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime());
}
/** Use the creation time of {@link KafkaRecord} as the output timestamp. */
public ReadSourceDescriptors<K, V> withCreateTime() {
return withExtractOutputTimestampFn(
ReadSourceDescriptors.ExtractOutputTimestampFns.useCreateTime());
}
/** Use the {@link WallTime} as the watermark estimator. */
public ReadSourceDescriptors<K, V> withWallTimeWatermarkEstimator() {
return withCreatWatermarkEstimatorFn(
state -> {
return new WallTime(state);
});
}
/** Use the {@link MonotonicallyIncreasing} as the watermark estimator. */
public ReadSourceDescriptors<K, V> withMonotonicallyIncreasingWatermarkEstimator() {
return withCreatWatermarkEstimatorFn(
state -> {
return new MonotonicallyIncreasing(state);
});
}
/** Use the {@link Manual} as the watermark estimator. */
public ReadSourceDescriptors<K, V> withManualWatermarkEstimator() {
return withCreatWatermarkEstimatorFn(
state -> {
return new Manual(state);
});
}
/**
* Sets "isolation_level" to "read_committed" in Kafka consumer configuration. This ensures that
* the consumer does not read uncommitted messages. Kafka version 0.11 introduced transactional
* writes. Applications requiring end-to-end exactly-once semantics should only read committed
* messages. See JavaDoc for {@link KafkaConsumer} for more description.
*/
public ReadSourceDescriptors<K, V> withReadCommitted() {
return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", "read_committed"));
}
/**
* Enable committing record offset. If {@link #withReadCommitted()} or {@link
* ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set together with {@link #commitOffsets()},
* {@link #commitOffsets()} will be ignored.
*/
public ReadSourceDescriptors<K, V> commitOffsets() {
return toBuilder().setCommitOffsetEnabled(true).build();
}
/**
* Set additional configuration for the offset consumer. It may be required for a secured Kafka
* cluster, especially when you see similar WARN log message {@code exception while fetching
* latest offset for partition {}. will be retried}.
*
* <p>In {@link ReadFromKafkaDoFn}, there are two consumers running in the backend:
*
* <ol>
* <li>the main consumer which reads data from kafka.
* <li>the secondary offset consumer which is used to estimate the backlog by fetching the
* latest offset.
* </ol>
*
* <p>By default, offset consumer inherits the configuration from main consumer, with an
* auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not work in a secured Kafka
* which requires additional configuration.
*
* <p>See {@link #withConsumerConfigUpdates} for configuring the main consumer.
*/
public ReadSourceDescriptors<K, V> withOffsetConsumerConfigOverrides(
Map<String, Object> offsetConsumerConfig) {
return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
}
/**
* Replaces the configuration for the main consumer.
*
* <p>In {@link ReadFromKafkaDoFn}, there are two consumers running in the backend:
*
* <ol>
* <li>the main consumer which reads data from kafka.
* <li>the secondary offset consumer which is used to estimate the backlog by fetching the
* latest offset.
* </ol>
*
* <p>By default, main consumer uses the configuration from {@link
* KafkaIOUtils#DEFAULT_CONSUMER_PROPERTIES}.
*
* <p>See {@link #withConsumerConfigUpdates} for updating the configuration instead of
* overriding it.
*/
public ReadSourceDescriptors<K, V> withConsumerConfigOverrides(
Map<String, Object> consumerConfig) {
return toBuilder().setConsumerConfig(consumerConfig).build();
}
ReadAllFromRow forExternalBuild() {
return new ReadAllFromRow(this);
}
/**
* A transform that is used in cross-language case. The input {@link Row} should be encoded with
* an equivalent schema as {@link KafkaSourceDescriptor}.
*/
private static class ReadAllFromRow<K, V>
extends PTransform<PCollection<Row>, PCollection<KV<K, V>>> {
private final ReadSourceDescriptors<K, V> readViaSDF;
ReadAllFromRow(ReadSourceDescriptors read) {
readViaSDF = read;
}
@Override
public PCollection<KV<K, V>> expand(PCollection<Row> input) {
return input
.apply(Convert.fromRows(KafkaSourceDescriptor.class))
.apply(readViaSDF)
.apply(
ParDo.of(
new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
@ProcessElement
public void processElement(
@Element KafkaRecord element, OutputReceiver<KV<K, V>> outputReceiver) {
outputReceiver.output(element.getKV());
}
}))
.setCoder(KvCoder.<K, V>of(readViaSDF.getKeyCoder(), readViaSDF.getValueCoder()));
}
}
/**
* Set the {@link TimestampPolicyFactory}. If the {@link TimestampPolicyFactory} is given, the
* output timestamp will be computed by the {@link
* TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)} and {@link Manual} is
* used as the watermark estimator.
*/
ReadSourceDescriptors<K, V> withTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory) {
return toBuilder()
.setTimestampPolicyFactory(timestampPolicyFactory)
.build()
.withManualWatermarkEstimator();
}
@Override
public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor> input) {
checkArgument(getKeyDeserializerProvider() != null, "withKeyDeserializer() is required");
checkArgument(getValueDeserializerProvider() != null, "withValueDeserializer() is required");
if (!ConsumerSpEL.hasOffsetsForTimes()) {
LOG.warn(
"Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and "
+ "may not be supported in next release of Apache Beam. "
+ "Please upgrade your Kafka client version.",
AppInfoParser.getVersion());
}
if (isCommitOffsetEnabled()) {
if (configuredKafkaCommit()) {
LOG.info(
"Either read_committed or auto_commit is set together with commitOffsetEnabled but you "
+ "only need one of them. The commitOffsetEnabled is going to be ignored");
}
}
if (getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) {
LOG.warn(
"The bootstrapServers is not set. It must be populated through the KafkaSourceDescriptor during runtime otherwise the pipeline will fail.");
}
CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
Coder<K> keyCoder = getKeyCoder(coderRegistry);
Coder<V> valueCoder = getValueCoder(coderRegistry);
Coder<KafkaRecord<K, V>> recordCoder = KafkaRecordCoder.of(keyCoder, valueCoder);
try {
PCollection<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> outputWithDescriptor =
input
.apply(ParDo.of(new ReadFromKafkaDoFn<K, V>(this)))
.setCoder(
KvCoder.of(
input
.getPipeline()
.getSchemaRegistry()
.getSchemaCoder(KafkaSourceDescriptor.class),
recordCoder));
if (isCommitOffsetEnabled() && !configuredKafkaCommit()) {
outputWithDescriptor =
outputWithDescriptor
.apply(Reshuffle.viaRandomKey())
.setCoder(
KvCoder.of(
input
.getPipeline()
.getSchemaRegistry()
.getSchemaCoder(KafkaSourceDescriptor.class),
recordCoder));
PCollection<Void> unused = outputWithDescriptor.apply(new KafkaCommitOffset<K, V>(this));
unused.setCoder(VoidCoder.of());
}
PCollection<KafkaRecord<K, V>> output =
outputWithDescriptor
.apply(
MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>() {})
.via(element -> element.getValue()))
.setCoder(recordCoder);
return output;
} catch (NoSuchSchemaException e) {
throw new RuntimeException(e.getMessage());
}
}
private Coder<K> getKeyCoder(CoderRegistry coderRegistry) {
return (getKeyCoder() != null)
? getKeyCoder()
: getKeyDeserializerProvider().getCoder(coderRegistry);
}
private Coder<V> getValueCoder(CoderRegistry coderRegistry) {
return (getValueCoder() != null)
? getValueCoder()
: getValueDeserializerProvider().getCoder(coderRegistry);
}
private boolean configuredKafkaCommit() {
return getConsumerConfig().get("isolation.level") == "read_committed"
|| Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
static class ExtractOutputTimestampFns<K, V> {
public static <K, V> SerializableFunction<KafkaRecord<K, V>, Instant> useProcessingTime() {
return record -> Instant.now();
}
public static <K, V> SerializableFunction<KafkaRecord<K, V>, Instant> useCreateTime() {
return record -> {
checkArgument(
record.getTimestampType() == KafkaTimestampType.CREATE_TIME,
"Kafka record's timestamp is not 'CREATE_TIME' "
+ "(topic: %s, partition %s, offset %s, timestamp type '%s')",
record.getTopic(),
record.getPartition(),
record.getOffset(),
record.getTimestampType());
return new Instant(record.getTimestamp());
};
}
public static <K, V> SerializableFunction<KafkaRecord<K, V>, Instant> useLogAppendTime() {
return record -> {
checkArgument(
record.getTimestampType() == KafkaTimestampType.LOG_APPEND_TIME,
"Kafka record's timestamp is not 'LOG_APPEND_TIME' "
+ "(topic: %s, partition %s, offset %s, timestamp type '%s')",
record.getTopic(),
record.getPartition(),
record.getOffset(),
record.getTimestampType());
return new Instant(record.getTimestamp());
};
}
}
}
////////////////////////////////////////////////////////////////////////////////////////////////
private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
/** Static class, prevent instantiation. */
private KafkaIO() {}
//////////////////////// Sink Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
/**
* A {@link PTransform} to write to a Kafka topic with ProducerRecord's. See {@link KafkaIO} for
* more information on usage and configuration.
*/
@AutoValue
@AutoValue.CopyAnnotations
@SuppressWarnings({"rawtypes"})
public abstract static class WriteRecords<K, V>
extends PTransform<PCollection<ProducerRecord<K, V>>, PDone> {
// TODO (Version 3.0): Create the only one generic {@code Write<T>} transform which will be
// parameterized depending on type of input collection (KV, ProducerRecords, etc). In such case,
// we shouldn't have to duplicate the same API for similar transforms like {@link Write} and
// {@link WriteRecords}. See example at {@link PubsubIO.Write}.
abstract @Nullable String getTopic();
abstract Map<String, Object> getProducerConfig();
abstract @Nullable SerializableFunction<Map<String, Object>, Producer<K, V>>
getProducerFactoryFn();
abstract @Nullable Class<? extends Serializer<K>> getKeySerializer();
abstract @Nullable Class<? extends Serializer<V>> getValueSerializer();
abstract @Nullable KafkaPublishTimestampFunction<ProducerRecord<K, V>>
getPublishTimestampFunction();
// Configuration for EOS sink
abstract boolean isEOS();
abstract @Nullable String getSinkGroupId();
abstract int getNumShards();
abstract @Nullable SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>>
getConsumerFactoryFn();
abstract Builder<K, V> toBuilder();
@AutoValue.Builder
abstract static class Builder<K, V> {
abstract Builder<K, V> setTopic(String topic);
abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig);
abstract Builder<K, V> setProducerFactoryFn(
SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> serializer);
abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> serializer);
abstract Builder<K, V> setPublishTimestampFunction(
KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction);
abstract Builder<K, V> setEOS(boolean eosEnabled);
abstract Builder<K, V> setSinkGroupId(String sinkGroupId);
abstract Builder<K, V> setNumShards(int numShards);
abstract Builder<K, V> setConsumerFactoryFn(
SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> fn);
abstract WriteRecords<K, V> build();
}
/**
* Returns a new {@link Write} transform with Kafka producer pointing to {@code
* bootstrapServers}.
*/
public WriteRecords<K, V> withBootstrapServers(String bootstrapServers) {
return withProducerConfigUpdates(
ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
}
/**
* Sets the default Kafka topic to write to. Use {@code ProducerRecords} to set topic name per
* published record.
*/
public WriteRecords<K, V> withTopic(String topic) {
return toBuilder().setTopic(topic).build();
}
/**
* Sets a {@link Serializer} for serializing key (if any) to bytes.
*
* <p>A key is optional while writing to Kafka. Note when a key is set, its hash is used to
* determine partition in Kafka (see {@link ProducerRecord} for more details).
*/
public WriteRecords<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
return toBuilder().setKeySerializer(keySerializer).build();
}
/** Sets a {@link Serializer} for serializing value to bytes. */
public WriteRecords<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
return toBuilder().setValueSerializer(valueSerializer).build();
}
/**
* Adds the given producer properties, overriding old values of properties with the same key.
*
* @deprecated as of version 2.13. Use {@link #withProducerConfigUpdates(Map)} instead.
*/
@Deprecated
public WriteRecords<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
Map<String, Object> config =
KafkaIOUtils.updateKafkaProperties(getProducerConfig(), configUpdates);
return toBuilder().setProducerConfig(config).build();
}
/**
* Update configuration for the producer. Note that the default producer properties will not be
* completely overridden. This method only updates the value which has the same key.
*
* <p>By default, the producer uses the configuration from {@link #DEFAULT_PRODUCER_PROPERTIES}.
*/
public WriteRecords<K, V> withProducerConfigUpdates(Map<String, Object> configUpdates) {
Map<String, Object> config =
KafkaIOUtils.updateKafkaProperties(getProducerConfig(), configUpdates);
return toBuilder().setProducerConfig(config).build();
}
/**
* Sets a custom function to create Kafka producer. Primarily used for tests. Default is {@link
* KafkaProducer}
*/
public WriteRecords<K, V> withProducerFactoryFn(
SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) {
return toBuilder().setProducerFactoryFn(producerFactoryFn).build();
}
/**
* The timestamp for each record being published is set to timestamp of the element in the
* pipeline. This is equivalent to {@code withPublishTimestampFunction((e, ts) -> ts)}. <br>
* NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is
* processing messages from the past, they might be deleted immediately by Kafka after being
* published if the timestamps are older than Kafka cluster's {@code log.retention.hours}.
*/
public WriteRecords<K, V> withInputTimestamp() {
return withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp());
}
/**
* A function to provide timestamp for records being published. <br>
* NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is
* processing messages from the past, they might be deleted immediately by Kafka after being
* published if the timestamps are older than Kafka cluster's {@code log.retention.hours}.
*
* @deprecated use {@code ProducerRecords} to set publish timestamp.
*/
@Deprecated
public WriteRecords<K, V> withPublishTimestampFunction(
KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction) {
return toBuilder().setPublishTimestampFunction(timestampFunction).build();
}
/**
* Provides exactly-once semantics while writing to Kafka, which enables applications with
* end-to-end exactly-once guarantees on top of exactly-once semantics <i>within</i> Beam
* pipelines. It ensures that records written to sink are committed on Kafka exactly once, even
* in the case of retries during pipeline execution even when some processing is retried.
* Retries typically occur when workers restart (as in failure recovery), or when the work is
* redistributed (as in an autoscaling event).
*
* <p>Beam runners typically provide exactly-once semantics for results of a pipeline, but not
* for side effects from user code in transform. If a transform such as Kafka sink writes to an
* external system, those writes might occur more than once. When EOS is enabled here, the sink
* transform ties checkpointing semantics in compatible Beam runners and transactions in Kafka
* (version 0.11+) to ensure a record is written only once. As the implementation relies on
* runners checkpoint semantics, not all the runners are compatible. The sink throws an
* exception during initialization if the runner is not explicitly allowed. The Dataflow, Flink,
* and Spark runners are compatible.
*
* <p>Note on performance: Exactly-once sink involves two shuffles of the records. In addition
* to cost of shuffling the records among workers, the records go through 2
* serialization-deserialization cycles. Depending on volume and cost of serialization, the CPU
* cost might be noticeable. The CPU cost can be reduced by writing byte arrays (i.e.
* serializing them to byte before writing to Kafka sink).
*
* @param numShards Sets sink parallelism. The state metadata stored on Kafka is stored across
* this many virtual partitions using {@code sinkGroupId}. A good rule of thumb is to set
* this to be around number of partitions in Kafka topic.
* @param sinkGroupId The <i>group id</i> used to store small amount of state as metadata on
* Kafka. It is similar to <i>consumer group id</i> used with a {@link KafkaConsumer}. Each
* job should use a unique group id so that restarts/updates of job preserve the state to
* ensure exactly-once semantics. The state is committed atomically with sink transactions
* on Kafka. See {@link KafkaProducer#sendOffsetsToTransaction(Map, String)} for more
* information. The sink performs multiple sanity checks during initialization to catch
* common mistakes so that it does not end up using state that does not <i>seem</i> to be
* written by the same job.
*/
public WriteRecords<K, V> withEOS(int numShards, String sinkGroupId) {
KafkaExactlyOnceSink.ensureEOSSupport();
checkArgument(numShards >= 1, "numShards should be >= 1");
checkArgument(sinkGroupId != null, "sinkGroupId is required for exactly-once sink");
return toBuilder().setEOS(true).setNumShards(numShards).setSinkGroupId(sinkGroupId).build();
}
/**
* When exactly-once semantics are enabled (see {@link #withEOS(int, String)}), the sink needs
* to fetch previously stored state with Kafka topic. Fetching the metadata requires a consumer.
* Similar to {@link Read#withConsumerFactoryFn(SerializableFunction)}, a factory function can
* be supplied if required in a specific case. The default is {@link KafkaConsumer}.
*/
public WriteRecords<K, V> withConsumerFactoryFn(
SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {
return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
}
@Override
public PDone expand(PCollection<ProducerRecord<K, V>> input) {
checkArgument(
getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
"withBootstrapServers() is required");
checkArgument(getKeySerializer() != null, "withKeySerializer() is required");
checkArgument(getValueSerializer() != null, "withValueSerializer() is required");
if (isEOS()) {
checkArgument(getTopic() != null, "withTopic() is required when isEOS() is true");
KafkaExactlyOnceSink.ensureEOSSupport();
// TODO: Verify that the group_id does not have existing state stored on Kafka unless
// this is an upgrade. This avoids issues with simple mistake of reusing group_id
// across multiple runs or across multiple jobs. This is checked when the sink
// transform initializes while processing the output. It might be better to
// check here to catch common mistake.
input.apply(new KafkaExactlyOnceSink<>(this));
} else {
input.apply(ParDo.of(new KafkaWriter<>(this)));
}
return PDone.in(input.getPipeline());
}
@Override
public void validate(PipelineOptions options) {
if (isEOS()) {
String runner = options.getRunner().getName();
if ("org.apache.beam.runners.direct.DirectRunner".equals(runner)
|| runner.startsWith("org.apache.beam.runners.dataflow.")
|| runner.startsWith("org.apache.beam.runners.spark.")
|| runner.startsWith("org.apache.beam.runners.flink.")) {
return;
}
throw new UnsupportedOperationException(
runner
+ " is not a runner known to be compatible with Kafka exactly-once sink. This"
+ " implementation of exactly-once sink relies on specific checkpoint guarantees."
+ " Only the runners with known to have compatible checkpoint semantics are"
+ " allowed.");
}
}
// set config defaults
private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
ImmutableMap.of(ProducerConfig.RETRIES_CONFIG, 3);
/** A set of properties that are not required or don't make sense for our producer. */
private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES =
ImmutableMap.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer instead",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead");
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.addIfNotNull(DisplayData.item("topic", getTopic()).withLabel("Topic"));
Set<String> ignoredProducerPropertiesKeys = IGNORED_PRODUCER_PROPERTIES.keySet();
for (Map.Entry<String, Object> conf : getProducerConfig().entrySet()) {
String key = conf.getKey();
if (!ignoredProducerPropertiesKeys.contains(key)) {
Object value =
DisplayData.inferType(conf.getValue()) != null
? conf.getValue()
: String.valueOf(conf.getValue());
builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value)));
}
}
}
}
/**
* A {@link PTransform} to write to a Kafka topic with KVs . See {@link KafkaIO} for more
* information on usage and configuration.
*/
@AutoValue
@AutoValue.CopyAnnotations
@SuppressWarnings({"rawtypes"})
public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
// TODO (Version 3.0): Create the only one generic {@code Write<T>} transform which will be
// parameterized depending on type of input collection (KV, ProducerRecords, etc). In such case,
// we shouldn't have to duplicate the same API for similar transforms like {@link Write} and
// {@link WriteRecords}. See example at {@link PubsubIO.Write}.
abstract @Nullable String getTopic();
abstract WriteRecords<K, V> getWriteRecordsTransform();
abstract Builder<K, V> toBuilder();
@Experimental(Kind.PORTABILITY)
@AutoValue.Builder
abstract static class Builder<K, V>
implements ExternalTransformBuilder<External.Configuration, PCollection<KV<K, V>>, PDone> {
abstract Builder<K, V> setTopic(String topic);
abstract Builder<K, V> setWriteRecordsTransform(WriteRecords<K, V> transform);
abstract Write<K, V> build();
@Override
public PTransform<PCollection<KV<K, V>>, PDone> buildExternal(
External.Configuration configuration) {
setTopic(configuration.topic);
Map<String, Object> producerConfig = new HashMap<>(configuration.producerConfig);
Class keySerializer = resolveClass(configuration.keySerializer);
Class valSerializer = resolveClass(configuration.valueSerializer);
WriteRecords<K, V> writeRecords =
KafkaIO.<K, V>writeRecords()
.withProducerConfigUpdates(producerConfig)
.withKeySerializer(keySerializer)
.withValueSerializer(valSerializer)
.withTopic(configuration.topic);
setWriteRecordsTransform(writeRecords);
return build();
}
}
/** Exposes {@link KafkaIO.Write} as an external transform for cross-language usage. */
@Experimental(Kind.PORTABILITY)
@AutoService(ExternalTransformRegistrar.class)
public static class External implements ExternalTransformRegistrar {
public static final String URN = "beam:external:java:kafka:write:v1";
@Override
public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
return ImmutableMap.of(
URN,
(Class<KafkaIO.Write.Builder<?, ?>>) (Class<?>) AutoValue_KafkaIO_Write.Builder.class);
}
/** Parameters class to expose the Write transform to an external SDK. */
public static class Configuration {
private Map<String, String> producerConfig;
private String topic;
private String keySerializer;
private String valueSerializer;
public void setProducerConfig(Map<String, String> producerConfig) {
this.producerConfig = producerConfig;
}
public void setTopic(String topic) {
this.topic = topic;
}
public void setKeySerializer(String keySerializer) {
this.keySerializer = keySerializer;
}
public void setValueSerializer(String valueSerializer) {
this.valueSerializer = valueSerializer;
}
}
}
/** Used mostly to reduce using of boilerplate of wrapping {@link WriteRecords} methods. */
private Write<K, V> withWriteRecordsTransform(WriteRecords<K, V> transform) {
return toBuilder().setWriteRecordsTransform(transform).build();
}
/**
* Wrapper method over {@link WriteRecords#withBootstrapServers(String)}, used to keep the
* compatibility with old API based on KV type of element.
*/
public Write<K, V> withBootstrapServers(String bootstrapServers) {
return withWriteRecordsTransform(
getWriteRecordsTransform().withBootstrapServers(bootstrapServers));
}
/**
* Wrapper method over {@link WriteRecords#withTopic(String)}, used to keep the compatibility
* with old API based on KV type of element.
*/
public Write<K, V> withTopic(String topic) {
return toBuilder()
.setTopic(topic)
.setWriteRecordsTransform(getWriteRecordsTransform().withTopic(topic))
.build();
}
/**
* Wrapper method over {@link WriteRecords#withKeySerializer(Class)}, used to keep the
* compatibility with old API based on KV type of element.
*/
public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
return withWriteRecordsTransform(getWriteRecordsTransform().withKeySerializer(keySerializer));
}
/**
* Wrapper method over {@link WriteRecords#withValueSerializer(Class)}, used to keep the
* compatibility with old API based on KV type of element.
*/
public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
return withWriteRecordsTransform(
getWriteRecordsTransform().withValueSerializer(valueSerializer));
}
/**
* Wrapper method over {@link WriteRecords#withProducerFactoryFn(SerializableFunction)}, used to
* keep the compatibility with old API based on KV type of element.
*/
public Write<K, V> withProducerFactoryFn(
SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) {
return withWriteRecordsTransform(
getWriteRecordsTransform().withProducerFactoryFn(producerFactoryFn));
}
/**
* Wrapper method over {@link WriteRecords#withInputTimestamp()}, used to keep the compatibility
* with old API based on KV type of element.
*/
public Write<K, V> withInputTimestamp() {
return withWriteRecordsTransform(getWriteRecordsTransform().withInputTimestamp());
}
/**
* Wrapper method over {@link
* WriteRecords#withPublishTimestampFunction(KafkaPublishTimestampFunction)}, used to keep the
* compatibility with old API based on KV type of element.
*
* @deprecated use {@link WriteRecords} and {@code ProducerRecords} to set publish timestamp.
*/
@Deprecated
@SuppressWarnings({"unchecked", "rawtypes"})
public Write<K, V> withPublishTimestampFunction(
KafkaPublishTimestampFunction<KV<K, V>> timestampFunction) {
return withWriteRecordsTransform(
getWriteRecordsTransform()
.withPublishTimestampFunction(new PublishTimestampFunctionKV(timestampFunction)));
}
/**
* Wrapper method over {@link WriteRecords#withEOS(int, String)}, used to keep the compatibility
* with old API based on KV type of element.
*/
public Write<K, V> withEOS(int numShards, String sinkGroupId) {
return withWriteRecordsTransform(getWriteRecordsTransform().withEOS(numShards, sinkGroupId));
}
/**
* Wrapper method over {@link WriteRecords#withConsumerFactoryFn(SerializableFunction)}, used to
* keep the compatibility with old API based on KV type of element.
*/
public Write<K, V> withConsumerFactoryFn(
SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {
return withWriteRecordsTransform(
getWriteRecordsTransform().withConsumerFactoryFn(consumerFactoryFn));
}
/**
* Adds the given producer properties, overriding old values of properties with the same key.
*
* @deprecated as of version 2.13. Use {@link #withProducerConfigUpdates(Map)} instead.
*/
@Deprecated
public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
return withWriteRecordsTransform(
getWriteRecordsTransform().updateProducerProperties(configUpdates));
}
/**
* Update configuration for the producer. Note that the default producer properties will not be
* completely overridden. This method only updates the value which has the same key.
*
* <p>By default, the producer uses the configuration from {@link
* WriteRecords#DEFAULT_PRODUCER_PROPERTIES}.
*/
public Write<K, V> withProducerConfigUpdates(Map<String, Object> configUpdates) {
return withWriteRecordsTransform(
getWriteRecordsTransform().withProducerConfigUpdates(configUpdates));
}
@Override
public PDone expand(PCollection<KV<K, V>> input) {
checkArgument(getTopic() != null, "withTopic() is required");
KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
return input
.apply(
"Kafka ProducerRecord",
MapElements.via(
new SimpleFunction<KV<K, V>, ProducerRecord<K, V>>() {
@Override
public ProducerRecord<K, V> apply(KV<K, V> element) {
return new ProducerRecord<>(getTopic(), element.getKey(), element.getValue());
}
}))
.setCoder(ProducerRecordCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()))
.apply(getWriteRecordsTransform());
}
@Override
public void validate(PipelineOptions options) {
getWriteRecordsTransform().validate(options);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
getWriteRecordsTransform().populateDisplayData(builder);
}
/**
* Writes just the values to Kafka. This is useful for writing collections of values rather
* thank {@link KV}s.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public PTransform<PCollection<V>, PDone> values() {
return new KafkaValueWrite<K, V>(this.withKeySerializer((Class) StringSerializer.class));
}
/**
* Wrapper class which allows to use {@code KafkaPublishTimestampFunction<KV<K, V>} with {@link
* WriteRecords#withPublishTimestampFunction(KafkaPublishTimestampFunction)}.
*/
private static class PublishTimestampFunctionKV<K, V>
implements KafkaPublishTimestampFunction<ProducerRecord<K, V>> {
private KafkaPublishTimestampFunction<KV<K, V>> fn;
public PublishTimestampFunctionKV(KafkaPublishTimestampFunction<KV<K, V>> fn) {
this.fn = fn;
}
@Override
public Instant getTimestamp(ProducerRecord<K, V> e, Instant ts) {
return fn.getTimestamp(KV.of(e.key(), e.value()), ts);
}
}
}
/**
* Same as {@code Write<K, V>} without a Key. Null is used for key as it is the convention is
* Kafka when there is no key specified. Majority of Kafka writers don't specify a key.
*/
private static class KafkaValueWrite<K, V> extends PTransform<PCollection<V>, PDone> {
private final Write<K, V> kvWriteTransform;
private KafkaValueWrite(Write<K, V> kvWriteTransform) {
this.kvWriteTransform = kvWriteTransform;
}
@Override
public PDone expand(PCollection<V> input) {
return input
.apply(
"Kafka values with default key",
MapElements.via(
new SimpleFunction<V, KV<K, V>>() {
@Override
public KV<K, V> apply(V element) {
return KV.of(null, element);
}
}))
.setCoder(KvCoder.of(new NullOnlyCoder<>(), input.getCoder()))
.apply(kvWriteTransform);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
kvWriteTransform.populateDisplayData(builder);
}
}
private static class NullOnlyCoder<T> extends AtomicCoder<T> {
@Override
public void encode(T value, OutputStream outStream) {
checkArgument(value == null, "Can only encode nulls");
// Encode as no bytes.
}
@Override
public T decode(InputStream inStream) {
return null;
}
}
private static Class resolveClass(String className) {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not find class: " + className);
}
}
}