blob: 6aee23ff1b5cc451e1deebbd5549b49782f333de [file] [log] [blame]
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<!-- NewPage -->
<html lang="en">
<head>
<!-- Generated by javadoc -->
<title>KafkaIO (Apache Beam 2.38.0-SNAPSHOT)</title>
<link rel="stylesheet" type="text/css" href="../../../../../../stylesheet.css" title="Style">
<script type="text/javascript" src="../../../../../../script.js"></script>
</head>
<body>
<script type="text/javascript"><!--
try {
if (location.href.indexOf('is-external=true') == -1) {
parent.document.title="KafkaIO (Apache Beam 2.38.0-SNAPSHOT)";
}
}
catch(err) {
}
//-->
var methods = {"i0":9,"i1":9,"i2":9,"i3":9,"i4":9};
var tabs = {65535:["t0","All Methods"],1:["t1","Static Methods"],8:["t4","Concrete Methods"]};
var altColor = "altColor";
var rowColor = "rowColor";
var tableTab = "tableTab";
var activeTableTab = "activeTableTab";
</script>
<noscript>
<div>JavaScript is disabled on your browser.</div>
</noscript>
<!-- ========= START OF TOP NAVBAR ======= -->
<div class="topNav"><a name="navbar.top">
<!-- -->
</a>
<div class="skipNav"><a href="#skip.navbar.top" title="Skip navigation links">Skip navigation links</a></div>
<a name="navbar.top.firstrow">
<!-- -->
</a>
<ul class="navList" title="Navigation">
<li><a href="../../../../../../overview-summary.html">Overview</a></li>
<li><a href="package-summary.html">Package</a></li>
<li class="navBarCell1Rev">Class</li>
<li><a href="package-tree.html">Tree</a></li>
<li><a href="../../../../../../deprecated-list.html">Deprecated</a></li>
<li><a href="../../../../../../index-all.html">Index</a></li>
<li><a href="../../../../../../help-doc.html">Help</a></li>
</ul>
</div>
<div class="subNav">
<ul class="navList">
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaCommitOffset.html" title="class in org.apache.beam.sdk.io.kafka"><span class="typeNameLink">Prev&nbsp;Class</span></a></li>
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="class in org.apache.beam.sdk.io.kafka"><span class="typeNameLink">Next&nbsp;Class</span></a></li>
</ul>
<ul class="navList">
<li><a href="../../../../../../index.html?org/apache/beam/sdk/io/kafka/KafkaIO.html" target="_top">Frames</a></li>
<li><a href="KafkaIO.html" target="_top">No&nbsp;Frames</a></li>
</ul>
<ul class="navList" id="allclasses_navbar_top">
<li><a href="../../../../../../allclasses-noframe.html">All&nbsp;Classes</a></li>
</ul>
<div>
<script type="text/javascript"><!--
allClassesLink = document.getElementById("allclasses_navbar_top");
if(window==top) {
allClassesLink.style.display = "block";
}
else {
allClassesLink.style.display = "none";
}
//-->
</script>
</div>
<div>
<ul class="subNavList">
<li>Summary:&nbsp;</li>
<li><a href="#nested.class.summary">Nested</a>&nbsp;|&nbsp;</li>
<li>Field&nbsp;|&nbsp;</li>
<li>Constr&nbsp;|&nbsp;</li>
<li><a href="#method.summary">Method</a></li>
</ul>
<ul class="subNavList">
<li>Detail:&nbsp;</li>
<li>Field&nbsp;|&nbsp;</li>
<li>Constr&nbsp;|&nbsp;</li>
<li><a href="#method.detail">Method</a></li>
</ul>
</div>
<a name="skip.navbar.top">
<!-- -->
</a></div>
<!-- ========= END OF TOP NAVBAR ========= -->
<!-- ======== START OF CLASS DATA ======== -->
<div class="header">
<div class="subTitle">org.apache.beam.sdk.io.kafka</div>
<h2 title="Class KafkaIO" class="title">Class KafkaIO</h2>
</div>
<div class="contentContainer">
<ul class="inheritance">
<li>java.lang.Object</li>
<li>
<ul class="inheritance">
<li>org.apache.beam.sdk.io.kafka.KafkaIO</li>
</ul>
</li>
</ul>
<div class="description">
<ul class="blockList">
<li class="blockList">
<hr>
<br>
<pre><a href="../../../../../../org/apache/beam/sdk/annotations/Experimental.html" title="annotation in org.apache.beam.sdk.annotations">@Experimental</a>(<a href="../../../../../../org/apache/beam/sdk/annotations/Experimental.html#value--">value</a>=<a href="../../../../../../org/apache/beam/sdk/annotations/Experimental.Kind.html#SOURCE_SINK">SOURCE_SINK</a>)
public class <span class="typeNameLabel">KafkaIO</span>
extends java.lang.Object</pre>
<div class="block">An unbounded source and a sink for <a href="http://kafka.apache.org/">Kafka</a> topics.
<h2>Read from Kafka as <a href="../../../../../../org/apache/beam/sdk/io/UnboundedSource.html" title="class in org.apache.beam.sdk.io"><code>UnboundedSource</code></a></h2>
<h3>Reading from Kafka topics</h3>
<p>KafkaIO source returns unbounded collection of Kafka records as <code>PCollection&lt;KafkaRecord&lt;K, V&gt;&gt;</code>. A <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaRecord.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaRecord</code></a> includes basic metadata like
topic-partition and offset, along with key and value associated with a Kafka record.
<p>Although most applications consume a single topic, the source can be configured to consume
multiple topics or even a specific set of <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/common/TopicPartition.html?is-external=true" title="class or interface in org.apache.kafka.common"><code>TopicPartition</code></a>s.
<p>To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>,
one or more topics to consume, and key and value deserializers. For example:
<pre><code>
pipeline
.apply(KafkaIO.&lt;Long, String&gt;read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic") // use withTopics(List&lt;String&gt;) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
// Above four are required configuration. returns PCollection&lt;KafkaRecord&lt;Long, String&gt;&gt;
// Rest of the settings are optional :
// you can further customize KafkaConsumer used to read the records by adding more
// settings for ConsumerConfig. e.g :
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))
// set event times and watermark based on 'LogAppendTime'. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
// Use withCreateTime() with topics that have 'CreateTime' timestamps.
.withLogAppendTime()
// restrict reader to committed messages on Kafka (see method documentation).
.withReadCommitted()
// offset consumed by the pipeline can be committed back.
.commitOffsetsInFinalize()
// Specified a serializable function which can determine whether to stop reading from given
// TopicPartition during runtime. Note that only {@link ReadFromKafkaDoFn} respect the
// signal.
.withCheckStopReadingFn(new SerializedFunction&lt;TopicPartition, Boolean&gt;() {})
// finally, if you don't need Kafka metadata, you can drop it.g
.withoutMetadata() // PCollection&lt;KV&lt;Long, String&gt;&gt;
)
.apply(Values.&lt;String&gt;create()) // PCollection&lt;String&gt;
...
</code></pre>
<p>Kafka provides deserializers for common types in <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/common/serialization/package-summary.html?is-external=true"><code>org.apache.kafka.common.serialization</code></a>. In addition to deserializers, Beam runners need <a href="../../../../../../org/apache/beam/sdk/coders/Coder.html" title="class in org.apache.beam.sdk.coders"><code>Coder</code></a> to materialize key and value objects if necessary. In most cases, you don't need to
specify <a href="../../../../../../org/apache/beam/sdk/coders/Coder.html" title="class in org.apache.beam.sdk.coders"><code>Coder</code></a> for key and value in the resulting collection because the coders are
inferred from deserializer types. However, in cases when coder inference fails, they can be
specified explicitly along with deserializers using <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withKeyDeserializerAndCoder-java.lang.Class-org.apache.beam.sdk.coders.Coder-"><code>KafkaIO.Read.withKeyDeserializerAndCoder(Class, Coder)</code></a> and <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withValueDeserializerAndCoder-java.lang.Class-org.apache.beam.sdk.coders.Coder-"><code>KafkaIO.Read.withValueDeserializerAndCoder(Class, Coder)</code></a>. Note that Kafka messages are interpreted using
key and value <i>deserializers</i>.
<h3>Read From Kafka Dynamically</h3>
For a given kafka bootstrap_server, KafkaIO is also able to detect and read from available <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/common/TopicPartition.html?is-external=true" title="class or interface in org.apache.kafka.common"><code>TopicPartition</code></a> dynamically and stop reading from un. KafkaIO uses <code>WatchKafkaTopicPartitionDoFn</code> to emit any new added <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/common/TopicPartition.html?is-external=true" title="class or interface in org.apache.kafka.common"><code>TopicPartition</code></a> and uses <code>ReadFromKafkaDoFn</code> to read from each <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaSourceDescriptor</code></a>. Dynamic read is able to solve
2 scenarios:
<ul>
<li>Certain topic or partition is added/deleted.
<li>Certain topic or partition is added, then removed but added back again
</ul>
Within providing <code>checkStopReadingFn</code>, there are 2 more cases that dynamic read can handle:
<ul>
<li>Certain topic or partition is stopped
<li>Certain topic or partition is added, then stopped but added back again
</ul>
Race conditions may happen under 2 supported cases:
<ul>
<li>A TopicPartition is removed, but added backed again
<li>A TopicPartition is stopped, then want to read it again
</ul>
When race condition happens, it will result in the stopped/removed TopicPartition failing to be
emitted to ReadFromKafkaDoFn again. Or ReadFromKafkaDoFn will output replicated records. The
major cause for such race condition is that both <code>WatchKafkaTopicPartitionDoFn</code> and <code>ReadFromKafkaDoFn</code> react to the signal from removed/stopped <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/common/TopicPartition.html?is-external=true" title="class or interface in org.apache.kafka.common"><code>TopicPartition</code></a> but we cannot
guarantee that both DoFns perform related actions at the same time.
<p>Here is one example for failing to emit new added <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/common/TopicPartition.html?is-external=true" title="class or interface in org.apache.kafka.common"><code>TopicPartition</code></a>:
<ul>
<li>A <code>WatchKafkaTopicPartitionDoFn</code> is configured with updating the current tracking set
every 1 hour.
<li>One TopicPartition A is tracked by the <code>WatchKafkaTopicPartitionDoFn</code> at 10:00AM and
<code>ReadFromKafkaDoFn</code> starts to read from TopicPartition A immediately.
<li>At 10:30AM, the <code>WatchKafkaTopicPartitionDoFn</code> notices that the <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/common/TopicPartition.html?is-external=true" title="class or interface in org.apache.kafka.common"><code>TopicPartition</code></a> has been stopped/removed, so it stops reading from it and returns <code>ProcessContinuation.stop()</code>.
<li>At 10:45 the pipeline author wants to read from TopicPartition A again.
<li>At 11:00AM when <code>WatchKafkaTopicPartitionDoFn</code> is invoked by firing timer, it doesn’t
know that TopicPartition A has been stopped/removed. All it knows is that TopicPartition A
is still an active TopicPartition and it will not emit TopicPartition A again.
</ul>
Another race condition example for producing duplicate records:
<ul>
<li>At 10:00AM, <code>ReadFromKafkaDoFn</code> is processing TopicPartition A
<li>At 10:05AM, <code>ReadFromKafkaDoFn</code> starts to process other TopicPartitions(sdf-initiated
checkpoint or runner-issued checkpoint happens)
<li>At 10:10AM, <code>WatchKafkaTopicPartitionDoFn</code> knows that TopicPartition A is
stopped/removed
<li>At 10:15AM, <code>WatchKafkaTopicPartitionDoFn</code> knows that TopicPartition A is added again
and emits TopicPartition A again
<li>At 10:20AM, <code>ReadFromKafkaDoFn</code> starts to process resumed TopicPartition A but at the
same time <code>ReadFromKafkaDoFn</code> is also processing the new emitted TopicPartitionA.
</ul>
For more design details, please refer to
https://docs.google.com/document/d/1FU3GxVRetHPLVizP3Mdv6mP5tpjZ3fd99qNjUI5DT5k/. To enable
dynamic read, you can write a pipeline like:
<pre><code>
pipeline
.apply(KafkaIO.&lt;Long, String&gt;read()
// Configure the dynamic read with 1 hour, where the pipeline will look into available
// TopicPartitions and emit new added ones every 1 hour.
.withDynamicRead(Duration.standardHours(1))
.withCheckStopReadingFn(new SerializedFunction&lt;TopicPartition, Boolean&gt;() {})
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
)
.apply(Values.&lt;String&gt;create()) // PCollection&lt;String&gt;
...
</code></pre>
<h3>Partition Assignment and Checkpointing</h3>
The Kafka partitions are evenly distributed among splits (workers).
<p>Checkpointing is fully supported and each split can resume from previous checkpoint (to the
extent supported by runner). See <code>KafkaUnboundedSource.split(int, PipelineOptions)</code> for
more details on splits and checkpoint support.
<p>When the pipeline starts for the first time, or without any checkpoint, the source starts
consuming from the <em>latest</em> offsets. You can override this behavior to consume from the
beginning by setting properties appropriately in <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/clients/consumer/ConsumerConfig.html?is-external=true" title="class or interface in org.apache.kafka.clients.consumer"><code>ConsumerConfig</code></a>, through <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withConsumerConfigUpdates-java.util.Map-"><code>KafkaIO.Read.withConsumerConfigUpdates(Map)</code></a>. You can also enable offset auto_commit in Kafka to resume
from last committed.
<p>In summary, KafkaIO.read follows below sequence to set initial offset:<br>
1. <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaCheckpointMark</code></a> provided by runner;<br>
2. Consumer offset stored in Kafka when <code>ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = true</code>;
<br>
3. Start from <em>latest</em> offset by default;
<p>Seek to initial offset is a blocking operation in Kafka API, which can block forever for
certain versions of Kafka client library. This is resolved by <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior">KIP-266</a>
which provides `default.api.timeout.ms` consumer config setting to control such timeouts.
KafkaIO.read implements timeout itself, to not to block forever in case older Kafka client is
used. It does recognize `default.api.timeout.ms` setting and will honor the timeout value if it
is passes in consumer config.
<h3>Use Avro schema with Confluent Schema Registry</h3>
<p>If you want to deserialize the keys and/or values based on a schema available in Confluent
Schema Registry, KafkaIO can fetch this schema from a specified Schema Registry URL and use it
for deserialization. A <a href="../../../../../../org/apache/beam/sdk/coders/Coder.html" title="class in org.apache.beam.sdk.coders"><code>Coder</code></a> will be inferred automatically based on the respective
<a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/common/serialization/Deserializer.html?is-external=true" title="class or interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>.
<p>For an Avro schema it will return a <a href="../../../../../../org/apache/beam/sdk/values/PCollection.html" title="class in org.apache.beam.sdk.values"><code>PCollection</code></a> of <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaRecord.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaRecord</code></a>s where key
and/or value will be typed as <a href="https://static.javadoc.io/org.apache.avro/avro/1.8.2/org/apache/avro/generic/GenericRecord.html?is-external=true" title="class or interface in org.apache.avro.generic"><code>GenericRecord</code></a>. In this case, users
don't need to specify key or/and value deserializers and coders since they will be set to <code>KafkaAvroDeserializer</code> and <a href="../../../../../../org/apache/beam/sdk/coders/AvroCoder.html" title="class in org.apache.beam.sdk.coders"><code>AvroCoder</code></a> by default accordingly.
<p>For example, below topic values are serialized with Avro schema stored in Schema Registry,
keys are typed as <code>Long</code>:
<pre><code>
PCollection&lt;KafkaRecord&lt;Long, GenericRecord&gt;&gt; input = pipeline
.apply(KafkaIO.&lt;Long, GenericRecord&gt;read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic")
.withKeyDeserializer(LongDeserializer.class)
// Use Confluent Schema Registry, specify schema registry URL and value subject
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
...
</code></pre>
<p>You can also pass properties to the schema registry client allowing you to configure
authentication
<pre><code>
ImmutableMap&lt;String, Object&gt; csrConfig =
ImmutableMap.&lt;String, Object&gt;builder()
.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE,"USER_INFO")
.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG,"&lt;username&gt;:&lt;password&gt;")
.build();
PCollection&lt;KafkaRecord&lt;Long, GenericRecord&gt;&gt; input = pipeline
.apply(KafkaIO.&lt;Long, GenericRecord&gt;read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic")
.withKeyDeserializer(LongDeserializer.class)
// Use Confluent Schema Registry, specify schema registry URL, value subject and schema registry client configuration
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of("https://localhost:8081", "my_topic-value", null, csrConfig))
...
</code></pre>
<h2>Read from Kafka as a <a href="../../../../../../org/apache/beam/sdk/transforms/DoFn.html" title="class in org.apache.beam.sdk.transforms"><code>DoFn</code></a></h2>
<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.ReadSourceDescriptors</code></a> is the <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a> that takes a PCollection of <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaSourceDescriptor</code></a> as input and outputs a PCollection of <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaRecord.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaRecord</code></a>. The core
implementation is based on <code>SplittableDoFn</code>. For more details about the concept of <code>SplittableDoFn</code>, please refer to the <a
href="https://beam.apache.org/blog/splittable-do-fn/">blog post</a> and <a
href="https://s.apache.org/beam-fn-api">design doc</a>. The major difference from <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.Read</code></a> is, <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.ReadSourceDescriptors</code></a> doesn't require source descriptions(e.g., <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#getTopicPartitions--"><code>KafkaIO.Read.getTopicPartitions()</code></a>, <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#getTopics--"><code>KafkaIO.Read.getTopics()</code></a>, <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#getStartReadTime--"><code>KafkaIO.Read.getStartReadTime()</code></a>, etc.) during the pipeline construction time. Instead, the
pipeline can populate these source descriptions during runtime. For example, the pipeline can
query Kafka topics from a BigQuery table and read these topics via <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.ReadSourceDescriptors</code></a>.
<h3>Common Kafka Consumer Configurations</h3>
<p>Most Kafka consumer configurations are similar to <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.Read</code></a>:
<ul>
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#getConsumerConfig--"><code>KafkaIO.ReadSourceDescriptors.getConsumerConfig()</code></a> is the same as <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#getConsumerConfig--"><code>KafkaIO.Read.getConsumerConfig()</code></a>.
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#getConsumerFactoryFn--"><code>KafkaIO.ReadSourceDescriptors.getConsumerFactoryFn()</code></a> is the same as <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#getConsumerFactoryFn--"><code>KafkaIO.Read.getConsumerFactoryFn()</code></a>.
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#getOffsetConsumerConfig--"><code>KafkaIO.ReadSourceDescriptors.getOffsetConsumerConfig()</code></a> is the same as <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#getOffsetConsumerConfig--"><code>KafkaIO.Read.getOffsetConsumerConfig()</code></a>.
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#getKeyCoder--"><code>KafkaIO.ReadSourceDescriptors.getKeyCoder()</code></a> is the same as <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#getKeyCoder--"><code>KafkaIO.Read.getKeyCoder()</code></a>.
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#getValueCoder--"><code>KafkaIO.ReadSourceDescriptors.getValueCoder()</code></a> is the same as <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#getValueCoder--"><code>KafkaIO.Read.getValueCoder()</code></a>.
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#getKeyDeserializerProvider--"><code>KafkaIO.ReadSourceDescriptors.getKeyDeserializerProvider()</code></a> is the same as <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#getKeyDeserializerProvider--"><code>KafkaIO.Read.getKeyDeserializerProvider()</code></a>.
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#getValueDeserializerProvider--"><code>KafkaIO.ReadSourceDescriptors.getValueDeserializerProvider()</code></a> is the same as <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#getValueDeserializerProvider--"><code>KafkaIO.Read.getValueDeserializerProvider()</code></a>.
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#isCommitOffsetEnabled--"><code>KafkaIO.ReadSourceDescriptors.isCommitOffsetEnabled()</code></a> has the same meaning as <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled--"><code>KafkaIO.Read.isCommitOffsetsInFinalizeEnabled()</code></a>.
</ul>
<p>For example, to create a basic <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.ReadSourceDescriptors</code></a> transform:
<pre><code>
pipeline
.apply(Create.of(KafkaSourceDescriptor.of(new TopicPartition("topic", 1)))
.apply(KafkaIO.readAll()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withKeyDeserializer(LongDeserializer.class).
.withValueDeserializer(StringDeserializer.class));
</code></pre>
Note that the <code>bootstrapServers</code> can also be populated from the <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaSourceDescriptor</code></a>:
<pre><code>
pipeline
.apply(Create.of(
KafkaSourceDescriptor.of(
new TopicPartition("topic", 1),
null,
null,
ImmutableList.of("broker_1:9092", "broker_2:9092"))
.apply(KafkaIO.readAll()
.withKeyDeserializer(LongDeserializer.class).
.withValueDeserializer(StringDeserializer.class));
</code></pre>
<h3>Configurations of <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.ReadSourceDescriptors</code></a></h3>
<p>Except configurations of Kafka Consumer, there are some other configurations which are related
to processing records.
<p><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#commitOffsets--"><code>KafkaIO.ReadSourceDescriptors.commitOffsets()</code></a> enables committing offset after processing the
record. Note that if the <code>isolation.level</code> is set to "read_committed" or <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/clients/consumer/ConsumerConfig.html?is-external=true#ENABLE_AUTO_COMMIT_CONFIG" title="class or interface in org.apache.kafka.clients.consumer"><code>ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG</code></a> is set in the consumer config, the <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#commitOffsets--"><code>KafkaIO.ReadSourceDescriptors.commitOffsets()</code></a> will be ignored.
<p><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#withExtractOutputTimestampFn-org.apache.beam.sdk.transforms.SerializableFunction-"><code>KafkaIO.ReadSourceDescriptors.withExtractOutputTimestampFn(SerializableFunction)</code></a> is used to
compute the <code>output timestamp</code> for a given <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaRecord.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaRecord</code></a> and controls the watermark
advancement. There are three built-in types:
<ul>
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#withProcessingTime--"><code>KafkaIO.ReadSourceDescriptors.withProcessingTime()</code></a>
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#withCreateTime--"><code>KafkaIO.ReadSourceDescriptors.withCreateTime()</code></a>
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#withLogAppendTime--"><code>KafkaIO.ReadSourceDescriptors.withLogAppendTime()</code></a>
</ul>
<p>For example, to create a <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.ReadSourceDescriptors</code></a> with this additional configuration:
<pre><code>
pipeline
.apply(Create.of(
KafkaSourceDescriptor.of(
new TopicPartition("topic", 1),
null,
null,
ImmutableList.of("broker_1:9092", "broker_2:9092"))
.apply(KafkaIO.readAll()
.withKeyDeserializer(LongDeserializer.class).
.withValueDeserializer(StringDeserializer.class)
.withProcessingTime()
.commitOffsets());
</code></pre>
<h3>Writing to Kafka</h3>
<p>KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write just the
values or native Kafka producer records using <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/clients/producer/ProducerRecord.html?is-external=true" title="class or interface in org.apache.kafka.clients.producer"><code>ProducerRecord</code></a>. To configure a Kafka sink, you must specify at
the minimum Kafka <tt>bootstrapServers</tt>, the topic to write to, and key and value
serializers. For example:
<pre><code>
PCollection&lt;KV&lt;Long, String&gt;&gt; kvColl = ...;
kvColl.apply(KafkaIO.&lt;Long, String&gt;write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class)
// You can further customize KafkaProducer used to write the records by adding more
// settings for ProducerConfig. e.g, to enable compression :
.withProducerConfigUpdates(ImmutableMap.of("compression.type", "gzip"))
// You set publish timestamp for the Kafka records.
.withInputTimestamp() // element timestamp is used while publishing to Kafka
// or you can also set a custom timestamp with a function.
.withPublishTimestampFunction((elem, elemTs) -&gt; ...)
// Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS().
.withEOS(20, "eos-sink-group-id");
);
</code></pre>
<p>To produce Avro values you can use class <code>KafkaAvroSerializer</code>. To make this class work with <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.html#write--"><code>write()</code></a> and method withValueSerializer() make sure to erase the generic types by casting
to (Class) as shown in the following example:
<pre><code>
KafkaIO.&lt;Long, String&gt;write()
...
.withValueSerializer((Class)KafkaAvroSerializer.class)
.withProducerConfigUpdates( &lt;Map with schema registry configuration details&gt; )
...
</code></pre>
<p>Often you might want to write just values without any keys to Kafka. Use <code>values()</code> to
write records with default empty(null) key:
<pre><code>
PCollection&lt;String&gt; strings = ...;
strings.apply(KafkaIO.&lt;Void, String&gt;write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withValueSerializer(StringSerializer.class) // just need serializer for value
.values()
);
</code></pre>
<p>Also, if you want to write Kafka <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/clients/producer/ProducerRecord.html?is-external=true" title="class or interface in org.apache.kafka.clients.producer"><code>ProducerRecord</code></a> then you should use <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.html#writeRecords--"><code>writeRecords()</code></a>:
<pre><code>
PCollection&lt;ProducerRecord&lt;Long, String&gt;&gt; records = ...;
records.apply(KafkaIO.&lt;Long, String&gt;writeRecords()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class)
);
</code></pre>
<h3>Advanced Kafka Configuration</h3>
KafkaIO allows setting most of the properties in <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/clients/consumer/ConsumerConfig.html?is-external=true" title="class or interface in org.apache.kafka.clients.consumer"><code>ConsumerConfig</code></a> for source or in <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/clients/producer/ProducerConfig.html?is-external=true" title="class or interface in org.apache.kafka.clients.producer"><code>ProducerConfig</code></a> for sink. E.g. if you would like to enable offset <em>auto commit</em> (for
external monitoring or other purposes), you can set <tt>"group.id"</tt>,
<tt>"enable.auto.commit"</tt>, etc.
<h3>Event Timestamps and Watermark</h3>
By default, record timestamp (event time) is set to processing time in KafkaIO reader and source
watermark is current wall time. If a topic has Kafka server-side ingestion timestamp enabled
('LogAppendTime'), it can enabled with <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withLogAppendTime--"><code>KafkaIO.Read.withLogAppendTime()</code></a>. A custom timestamp
policy can be provided by implementing <a href="../../../../../../org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html" title="interface in org.apache.beam.sdk.io.kafka"><code>TimestampPolicyFactory</code></a>. See <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-"><code>KafkaIO.Read.withTimestampPolicyFactory(TimestampPolicyFactory)</code></a> for more information.
<h3>Supported Kafka Client Versions</h3>
KafkaIO relies on <i>kafka-clients</i> for all its interactions with the Kafka cluster.
<i>kafka-clients</i> versions 0.10.1 and newer are supported at runtime. The older versions 0.9.x
- 0.10.0.0 are also supported, but are deprecated and likely be removed in near future. Please
ensure that the version included with the application is compatible with the version of your
Kafka cluster. Kafka client usually fails to initialize with a clear error message in case of
incompatibility.</div>
</li>
</ul>
</div>
<div class="summary">
<ul class="blockList">
<li class="blockList">
<!-- ======== NESTED CLASS SUMMARY ======== -->
<ul class="blockList">
<li class="blockList"><a name="nested.class.summary">
<!-- -->
</a>
<h3>Nested Class Summary</h3>
<table class="memberSummary" border="0" cellpadding="3" cellspacing="0" summary="Nested Class Summary table, listing nested classes, and an explanation">
<caption><span>Nested Classes</span><span class="tabEnd">&nbsp;</span></caption>
<tr>
<th class="colFirst" scope="col">Modifier and Type</th>
<th class="colLast" scope="col">Class and Description</th>
</tr>
<tr class="altColor">
<td class="colFirst"><code>static class&nbsp;</code></td>
<td class="colLast"><code><span class="memberNameLink"><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.Read</a>&lt;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="type parameter in KafkaIO.Read">K</a>,<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="type parameter in KafkaIO.Read">V</a>&gt;</span></code>
<div class="block">A <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a> to read from Kafka topics.</div>
</td>
</tr>
<tr class="rowColor">
<td class="colFirst"><code>static class&nbsp;</code></td>
<td class="colLast"><code><span class="memberNameLink"><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.ReadSourceDescriptors</a>&lt;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="type parameter in KafkaIO.ReadSourceDescriptors">K</a>,<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="type parameter in KafkaIO.ReadSourceDescriptors">V</a>&gt;</span></code>
<div class="block">A <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a> to read from <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaSourceDescriptor</code></a>.</div>
</td>
</tr>
<tr class="altColor">
<td class="colFirst"><code>static class&nbsp;</code></td>
<td class="colLast"><code><span class="memberNameLink"><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.TypedWithoutMetadata.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.TypedWithoutMetadata</a>&lt;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.TypedWithoutMetadata.html" title="type parameter in KafkaIO.TypedWithoutMetadata">K</a>,<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.TypedWithoutMetadata.html" title="type parameter in KafkaIO.TypedWithoutMetadata">V</a>&gt;</span></code>
<div class="block">A <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a> to read from Kafka topics.</div>
</td>
</tr>
<tr class="rowColor">
<td class="colFirst"><code>static class&nbsp;</code></td>
<td class="colLast"><code><span class="memberNameLink"><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Write.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.Write</a>&lt;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Write.html" title="type parameter in KafkaIO.Write">K</a>,<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Write.html" title="type parameter in KafkaIO.Write">V</a>&gt;</span></code>
<div class="block">A <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a> to write to a Kafka topic with KVs .</div>
</td>
</tr>
<tr class="altColor">
<td class="colFirst"><code>static class&nbsp;</code></td>
<td class="colLast"><code><span class="memberNameLink"><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.WriteRecords</a>&lt;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html" title="type parameter in KafkaIO.WriteRecords">K</a>,<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html" title="type parameter in KafkaIO.WriteRecords">V</a>&gt;</span></code>
<div class="block">A <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a> to write to a Kafka topic with ProducerRecord's.</div>
</td>
</tr>
</table>
</li>
</ul>
<!-- ========== METHOD SUMMARY =========== -->
<ul class="blockList">
<li class="blockList"><a name="method.summary">
<!-- -->
</a>
<h3>Method Summary</h3>
<table class="memberSummary" border="0" cellpadding="3" cellspacing="0" summary="Method Summary table, listing methods, and an explanation">
<caption><span id="t0" class="activeTableTab"><span>All Methods</span><span class="tabEnd">&nbsp;</span></span><span id="t1" class="tableTab"><span><a href="javascript:show(1);">Static Methods</a></span><span class="tabEnd">&nbsp;</span></span><span id="t4" class="tableTab"><span><a href="javascript:show(8);">Concrete Methods</a></span><span class="tabEnd">&nbsp;</span></span></caption>
<tr>
<th class="colFirst" scope="col">Modifier and Type</th>
<th class="colLast" scope="col">Method and Description</th>
</tr>
<tr id="i0" class="altColor">
<td class="colFirst"><code>static &lt;K,V&gt;&nbsp;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.Read</a>&lt;K,V&gt;</code></td>
<td class="colLast"><code><span class="memberNameLink"><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.html#read--">read</a></span>()</code>
<div class="block">Creates an uninitialized <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.Read</code></a> <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a>.</div>
</td>
</tr>
<tr id="i1" class="rowColor">
<td class="colFirst"><code>static <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.Read</a>&lt;byte[],byte[]&gt;</code></td>
<td class="colLast"><code><span class="memberNameLink"><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.html#readBytes--">readBytes</a></span>()</code>
<div class="block">A specific instance of uninitialized <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.html#read--"><code>read()</code></a> where key and values are bytes.</div>
</td>
</tr>
<tr id="i2" class="altColor">
<td class="colFirst"><code>static &lt;K,V&gt;&nbsp;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.ReadSourceDescriptors</a>&lt;K,V&gt;</code></td>
<td class="colLast"><code><span class="memberNameLink"><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.html#readSourceDescriptors--">readSourceDescriptors</a></span>()</code>
<div class="block">Creates an uninitialized <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.ReadSourceDescriptors</code></a> <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a>.</div>
</td>
</tr>
<tr id="i3" class="rowColor">
<td class="colFirst"><code>static &lt;K,V&gt;&nbsp;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Write.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.Write</a>&lt;K,V&gt;</code></td>
<td class="colLast"><code><span class="memberNameLink"><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.html#write--">write</a></span>()</code>
<div class="block">Creates an uninitialized <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Write.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.Write</code></a> <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a>.</div>
</td>
</tr>
<tr id="i4" class="altColor">
<td class="colFirst"><code>static &lt;K,V&gt;&nbsp;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.WriteRecords</a>&lt;K,V&gt;</code></td>
<td class="colLast"><code><span class="memberNameLink"><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.html#writeRecords--">writeRecords</a></span>()</code>
<div class="block">Creates an uninitialized <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.WriteRecords</code></a> <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a>.</div>
</td>
</tr>
</table>
<ul class="blockList">
<li class="blockList"><a name="methods.inherited.from.class.java.lang.Object">
<!-- -->
</a>
<h3>Methods inherited from class&nbsp;java.lang.Object</h3>
<code>clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait</code></li>
</ul>
</li>
</ul>
</li>
</ul>
</div>
<div class="details">
<ul class="blockList">
<li class="blockList">
<!-- ============ METHOD DETAIL ========== -->
<ul class="blockList">
<li class="blockList"><a name="method.detail">
<!-- -->
</a>
<h3>Method Detail</h3>
<a name="readBytes--">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>readBytes</h4>
<pre>public static&nbsp;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.Read</a>&lt;byte[],byte[]&gt;&nbsp;readBytes()</pre>
<div class="block">A specific instance of uninitialized <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.html#read--"><code>read()</code></a> where key and values are bytes. See
#read().</div>
</li>
</ul>
<a name="read--">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>read</h4>
<pre>public static&nbsp;&lt;K,V&gt;&nbsp;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.Read</a>&lt;K,V&gt;&nbsp;read()</pre>
<div class="block">Creates an uninitialized <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.Read</code></a> <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a>. Before use, basic Kafka configuration
should set with <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withBootstrapServers-java.lang.String-"><code>KafkaIO.Read.withBootstrapServers(String)</code></a> and <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTopics-java.util.List-"><code>KafkaIO.Read.withTopics(List)</code></a>.
Other optional settings include key and value <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/common/serialization/Deserializer.html?is-external=true" title="class or interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>s, custom timestamp,
watermark functions.</div>
</li>
</ul>
<a name="readSourceDescriptors--">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>readSourceDescriptors</h4>
<pre>public static&nbsp;&lt;K,V&gt;&nbsp;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.ReadSourceDescriptors</a>&lt;K,V&gt;&nbsp;readSourceDescriptors()</pre>
<div class="block">Creates an uninitialized <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.ReadSourceDescriptors</code></a> <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a>. Different from
<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.Read</code></a>, setting up <code>topics</code> and <code>bootstrapServers</code> is not required during
construction time. But the <code>bootstrapServers</code> still can be configured <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html#withBootstrapServers-java.lang.String-"><code>KafkaIO.ReadSourceDescriptors.withBootstrapServers(String)</code></a>. Please refer to <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.ReadSourceDescriptors.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.ReadSourceDescriptors</code></a> for more details.</div>
</li>
</ul>
<a name="write--">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>write</h4>
<pre>public static&nbsp;&lt;K,V&gt;&nbsp;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Write.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.Write</a>&lt;K,V&gt;&nbsp;write()</pre>
<div class="block">Creates an uninitialized <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Write.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.Write</code></a> <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a>. Before use, Kafka configuration
should be set with <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Write.html#withBootstrapServers-java.lang.String-"><code>KafkaIO.Write.withBootstrapServers(String)</code></a> and <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Write.html#withTopic-java.lang.String-"><code>KafkaIO.Write.withTopic(java.lang.String)</code></a> along
with <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/common/serialization/Deserializer.html?is-external=true" title="class or interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>s for (optional) key and values.</div>
</li>
</ul>
<a name="writeRecords--">
<!-- -->
</a>
<ul class="blockListLast">
<li class="blockList">
<h4>writeRecords</h4>
<pre>public static&nbsp;&lt;K,V&gt;&nbsp;<a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html" title="class in org.apache.beam.sdk.io.kafka">KafkaIO.WriteRecords</a>&lt;K,V&gt;&nbsp;writeRecords()</pre>
<div class="block">Creates an uninitialized <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html" title="class in org.apache.beam.sdk.io.kafka"><code>KafkaIO.WriteRecords</code></a> <a href="../../../../../../org/apache/beam/sdk/transforms/PTransform.html" title="class in org.apache.beam.sdk.transforms"><code>PTransform</code></a>. Before use, Kafka
configuration should be set with <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withBootstrapServers-java.lang.String-"><code>KafkaIO.WriteRecords.withBootstrapServers(String)</code></a> and <a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withTopic-java.lang.String-"><code>KafkaIO.WriteRecords.withTopic(java.lang.String)</code></a> along with <a href="https://static.javadoc.io/org.apache.kafka/kafka-clients/2.4.1/org/apache/kafka/common/serialization/Deserializer.html?is-external=true" title="class or interface in org.apache.kafka.common.serialization"><code>Deserializer</code></a>s for (optional) key and values.</div>
</li>
</ul>
</li>
</ul>
</li>
</ul>
</div>
</div>
<!-- ========= END OF CLASS DATA ========= -->
<!-- ======= START OF BOTTOM NAVBAR ====== -->
<div class="bottomNav"><a name="navbar.bottom">
<!-- -->
</a>
<div class="skipNav"><a href="#skip.navbar.bottom" title="Skip navigation links">Skip navigation links</a></div>
<a name="navbar.bottom.firstrow">
<!-- -->
</a>
<ul class="navList" title="Navigation">
<li><a href="../../../../../../overview-summary.html">Overview</a></li>
<li><a href="package-summary.html">Package</a></li>
<li class="navBarCell1Rev">Class</li>
<li><a href="package-tree.html">Tree</a></li>
<li><a href="../../../../../../deprecated-list.html">Deprecated</a></li>
<li><a href="../../../../../../index-all.html">Index</a></li>
<li><a href="../../../../../../help-doc.html">Help</a></li>
</ul>
</div>
<div class="subNav">
<ul class="navList">
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaCommitOffset.html" title="class in org.apache.beam.sdk.io.kafka"><span class="typeNameLink">Prev&nbsp;Class</span></a></li>
<li><a href="../../../../../../org/apache/beam/sdk/io/kafka/KafkaIO.Read.html" title="class in org.apache.beam.sdk.io.kafka"><span class="typeNameLink">Next&nbsp;Class</span></a></li>
</ul>
<ul class="navList">
<li><a href="../../../../../../index.html?org/apache/beam/sdk/io/kafka/KafkaIO.html" target="_top">Frames</a></li>
<li><a href="KafkaIO.html" target="_top">No&nbsp;Frames</a></li>
</ul>
<ul class="navList" id="allclasses_navbar_bottom">
<li><a href="../../../../../../allclasses-noframe.html">All&nbsp;Classes</a></li>
</ul>
<div>
<script type="text/javascript"><!--
allClassesLink = document.getElementById("allclasses_navbar_bottom");
if(window==top) {
allClassesLink.style.display = "block";
}
else {
allClassesLink.style.display = "none";
}
//-->
</script>
</div>
<div>
<ul class="subNavList">
<li>Summary:&nbsp;</li>
<li><a href="#nested.class.summary">Nested</a>&nbsp;|&nbsp;</li>
<li>Field&nbsp;|&nbsp;</li>
<li>Constr&nbsp;|&nbsp;</li>
<li><a href="#method.summary">Method</a></li>
</ul>
<ul class="subNavList">
<li>Detail:&nbsp;</li>
<li>Field&nbsp;|&nbsp;</li>
<li>Constr&nbsp;|&nbsp;</li>
<li><a href="#method.detail">Method</a></li>
</ul>
</div>
<a name="skip.navbar.bottom">
<!-- -->
</a></div>
<!-- ======== END OF BOTTOM NAVBAR ======= -->
</body>
</html>