blob: 09706ca40295a71dd43c6cb664b8c2b3c289c403 [file] [log] [blame]
<!DOCTYPE html>
<html class="writer-html5" lang="en" data-content_root="../../../">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>apache_beam.io.kafka &mdash; Apache Beam 2.67.0 documentation</title>
<link rel="stylesheet" type="text/css" href="../../../_static/pygments.css?v=b86133f3" />
<link rel="stylesheet" type="text/css" href="../../../_static/css/theme.css?v=e59714d7" />
<script src="../../../_static/jquery.js?v=5d32c60e"></script>
<script src="../../../_static/_sphinx_javascript_frameworks_compat.js?v=2cd50e6c"></script>
<script src="../../../_static/documentation_options.js?v=959b4fbe"></script>
<script src="../../../_static/doctools.js?v=9a2dae69"></script>
<script src="../../../_static/sphinx_highlight.js?v=dc90522c"></script>
<script src="../../../_static/js/theme.js"></script>
<link rel="index" title="Index" href="../../../genindex.html" />
<link rel="search" title="Search" href="../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../index.html" class="icon icon-home">
Apache Beam
</a>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" aria-label="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div><div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="Navigation menu">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.testing.html">apache_beam.testing package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.utils.html">apache_beam.utils package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.yaml.html">apache_beam.yaml package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"><nav class="wy-nav-top" aria-label="Mobile navigation menu" >
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="Page navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html" class="icon icon-home" aria-label="Home"></a></li>
<li class="breadcrumb-item"><a href="../../index.html">Module code</a></li>
<li class="breadcrumb-item active">apache_beam.io.kafka</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.io.kafka</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;Unbounded source and sink transforms for</span>
<span class="sd"> `Kafka &lt;href=&quot;http://kafka.apache.org/&gt;`_.</span>
<span class="sd"> These transforms are currently supported by Beam portable runners (for</span>
<span class="sd"> example, portable Flink and Spark) as well as Dataflow runner.</span>
<span class="sd"> **Setup**</span>
<span class="sd"> Transforms provided in this module are cross-language transforms</span>
<span class="sd"> implemented in the Beam Java SDK. During the pipeline construction, Python SDK</span>
<span class="sd"> will connect to a Java expansion service to expand these transforms.</span>
<span class="sd"> To facilitate this, a small amount of setup is needed before using these</span>
<span class="sd"> transforms in a Beam Python pipeline.</span>
<span class="sd"> There are several ways to setup cross-language Kafka transforms.</span>
<span class="sd"> * Option 1: use the default expansion service</span>
<span class="sd"> * Option 2: specify a custom expansion service</span>
<span class="sd"> See below for details regarding each of these options.</span>
<span class="sd"> *Option 1: Use the default expansion service*</span>
<span class="sd"> This is the recommended and easiest setup option for using Python Kafka</span>
<span class="sd"> transforms. This option is only available for Beam 2.22.0 and later.</span>
<span class="sd"> This option requires following pre-requisites before running the Beam</span>
<span class="sd"> pipeline.</span>
<span class="sd"> * Install Java runtime in the computer from where the pipeline is constructed</span>
<span class="sd"> and make sure that &#39;java&#39; command is available or set JAVA_HOME environment</span>
<span class="sd"> variable.</span>
<span class="sd"> In this option, Python SDK will either download (for released Beam version) or</span>
<span class="sd"> build (when running from a Beam Git clone) an expansion service jar and use</span>
<span class="sd"> that to expand transforms. Currently Kafka transforms use the</span>
<span class="sd"> &#39;beam-sdks-java-io-expansion-service&#39; jar for this purpose.</span>
<span class="sd"> Note that the KafkaIO read transform can be compiled in two modes</span>
<span class="sd"> * `ReadFromKafkaViaUnbounded` (legacy)</span>
<span class="sd"> * `ReadFromKafkaViaSDF` (default)</span>
<span class="sd"> To use the legacy mode, the `use_deprecated_read` flag should be specified</span>
<span class="sd"> within the IO expansion service. For example,</span>
<span class="sd"> kafka.default_io_expansion_service(</span>
<span class="sd"> append_args=[&quot;--experiments=use_deprecated_read&quot;]</span>
<span class="sd"> )</span>
<span class="sd"> To use the legacy mode using Dataflow runner, the `use_unbounded_sdf_wrapper`</span>
<span class="sd"> flag should be specified within the IO expansion service. For example,</span>
<span class="sd"> kafka.default_io_expansion_service(</span>
<span class="sd"> append_args=[&quot;--experiments=use_unbounded_sdf_wrapper&quot;]</span>
<span class="sd"> )</span>
<span class="sd"> *Option 2: specify a custom expansion service*</span>
<span class="sd"> In this option, you startup your own expansion service and provide that as</span>
<span class="sd"> a parameter when using the transforms provided in this module.</span>
<span class="sd"> This option requires following pre-requisites before running the Beam</span>
<span class="sd"> pipeline.</span>
<span class="sd"> * Startup your own expansion service.</span>
<span class="sd"> * Update your pipeline to provide the expansion service address when</span>
<span class="sd"> initiating Kafka transforms provided in this module.</span>
<span class="sd"> Flink Users can use the built-in Expansion Service of the Flink Runner&#39;s</span>
<span class="sd"> Job Server. If you start Flink&#39;s Job Server, the expansion service will be</span>
<span class="sd"> started on port 8097. For a different address, please set the</span>
<span class="sd"> expansion_service parameter.</span>
<span class="sd"> **More information**</span>
<span class="sd"> For more information regarding cross-language transforms see:</span>
<span class="sd"> - https://beam.apache.org/roadmap/portability/</span>
<span class="sd"> For more information specific to Flink runner see:</span>
<span class="sd"> - https://beam.apache.org/documentation/runners/flink/</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="c1"># pytype: skip-file</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">typing</span>
<span class="kn">import</span><span class="w"> </span><span class="nn">numpy</span><span class="w"> </span><span class="k">as</span><span class="w"> </span><span class="nn">np</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.transforms.external</span><span class="w"> </span><span class="kn">import</span> <span class="n">BeamJarExpansionService</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.transforms.external</span><span class="w"> </span><span class="kn">import</span> <span class="n">ExternalTransform</span>
<span class="kn">from</span><span class="w"> </span><span class="nn">apache_beam.transforms.external</span><span class="w"> </span><span class="kn">import</span> <span class="n">NamedTupleBasedPayloadBuilder</span>
<span class="n">ReadFromKafkaSchema</span> <span class="o">=</span> <span class="n">typing</span><span class="o">.</span><span class="n">NamedTuple</span><span class="p">(</span>
<span class="s1">&#39;ReadFromKafkaSchema&#39;</span><span class="p">,</span>
<span class="p">[</span>
<span class="p">(</span><span class="s1">&#39;consumer_config&#39;</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Mapping</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]),</span>
<span class="p">(</span><span class="s1">&#39;topics&#39;</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]),</span>
<span class="p">(</span><span class="s1">&#39;key_deserializer&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">),</span>
<span class="p">(</span><span class="s1">&#39;value_deserializer&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">),</span>
<span class="p">(</span><span class="s1">&#39;start_read_time&#39;</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]),</span>
<span class="p">(</span><span class="s1">&#39;max_num_records&#39;</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]),</span>
<span class="p">(</span><span class="s1">&#39;max_read_time&#39;</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]),</span>
<span class="p">(</span><span class="s1">&#39;commit_offset_in_finalize&#39;</span><span class="p">,</span> <span class="nb">bool</span><span class="p">),</span>
<span class="p">(</span><span class="s1">&#39;timestamp_policy&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">),</span>
<span class="p">(</span><span class="s1">&#39;consumer_polling_timeout&#39;</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]),</span>
<span class="p">(</span><span class="s1">&#39;redistribute&#39;</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]),</span>
<span class="p">(</span><span class="s1">&#39;redistribute_num_keys&#39;</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Optional</span><span class="p">[</span><span class="n">np</span><span class="o">.</span><span class="n">int32</span><span class="p">]),</span>
<span class="p">(</span><span class="s1">&#39;allow_duplicates&#39;</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Optional</span><span class="p">[</span><span class="nb">bool</span><span class="p">]),</span>
<span class="p">(</span><span class="s1">&#39;dynamic_read_poll_interval_seconds&#39;</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]),</span>
<span class="p">])</span>
<div class="viewcode-block" id="default_io_expansion_service">
<a class="viewcode-back" href="../../../apache_beam.io.kafka.html#apache_beam.io.kafka.default_io_expansion_service">[docs]</a>
<span class="k">def</span><span class="w"> </span><span class="nf">default_io_expansion_service</span><span class="p">(</span><span class="n">append_args</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">return</span> <span class="n">BeamJarExpansionService</span><span class="p">(</span>
<span class="s1">&#39;sdks:java:io:expansion-service:shadowJar&#39;</span><span class="p">,</span> <span class="n">append_args</span><span class="o">=</span><span class="n">append_args</span><span class="p">)</span></div>
<div class="viewcode-block" id="ReadFromKafka">
<a class="viewcode-back" href="../../../apache_beam.io.kafka.html#apache_beam.io.kafka.ReadFromKafka">[docs]</a>
<span class="k">class</span><span class="w"> </span><span class="nc">ReadFromKafka</span><span class="p">(</span><span class="n">ExternalTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> An external PTransform which reads from Kafka and returns a KV pair for</span>
<span class="sd"> each item in the specified Kafka topics. If no Kafka Deserializer for</span>
<span class="sd"> key/value is provided, then the data will be returned as a raw byte array.</span>
<span class="sd"> Experimental; no backwards compatibility guarantees.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Returns the key/value data as raw byte arrays</span>
<span class="n">byte_array_deserializer</span> <span class="o">=</span> <span class="p">(</span>
<span class="s1">&#39;org.apache.kafka.common.serialization.ByteArrayDeserializer&#39;</span><span class="p">)</span>
<span class="n">processing_time_policy</span> <span class="o">=</span> <span class="s1">&#39;ProcessingTime&#39;</span>
<span class="n">create_time_policy</span> <span class="o">=</span> <span class="s1">&#39;CreateTime&#39;</span>
<span class="n">log_append_time</span> <span class="o">=</span> <span class="s1">&#39;LogAppendTime&#39;</span>
<span class="n">URN_WITH_METADATA</span> <span class="o">=</span> <span class="p">(</span>
<span class="s1">&#39;beam:transform:org.apache.beam:kafka_read_with_metadata:v1&#39;</span><span class="p">)</span>
<span class="n">URN_WITHOUT_METADATA</span> <span class="o">=</span> <span class="p">(</span>
<span class="s1">&#39;beam:transform:org.apache.beam:kafka_read_without_metadata:v1&#39;</span><span class="p">)</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">consumer_config</span><span class="p">,</span>
<span class="n">topics</span><span class="p">,</span>
<span class="n">key_deserializer</span><span class="o">=</span><span class="n">byte_array_deserializer</span><span class="p">,</span>
<span class="n">value_deserializer</span><span class="o">=</span><span class="n">byte_array_deserializer</span><span class="p">,</span>
<span class="n">start_read_time</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">max_num_records</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">max_read_time</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">commit_offset_in_finalize</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">timestamp_policy</span><span class="o">=</span><span class="n">processing_time_policy</span><span class="p">,</span>
<span class="n">consumer_polling_timeout</span><span class="o">=</span><span class="mi">2</span><span class="p">,</span>
<span class="n">with_metadata</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">expansion_service</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">redistribute</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">redistribute_num_keys</span><span class="o">=</span><span class="n">np</span><span class="o">.</span><span class="n">int32</span><span class="p">(</span><span class="mi">0</span><span class="p">),</span>
<span class="n">allow_duplicates</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">dynamic_read_poll_interval_seconds</span><span class="p">:</span> <span class="n">typing</span><span class="o">.</span><span class="n">Optional</span><span class="p">[</span><span class="nb">int</span><span class="p">]</span> <span class="o">=</span> <span class="kc">None</span><span class="p">,</span>
<span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Initializes a read operation from Kafka.</span>
<span class="sd"> :param consumer_config: A dictionary containing the consumer configuration.</span>
<span class="sd"> :param topics: A list of topic strings.</span>
<span class="sd"> :param key_deserializer: A fully-qualified Java class name of a Kafka</span>
<span class="sd"> Deserializer for the topic&#39;s key, e.g.</span>
<span class="sd"> &#39;org.apache.kafka.common.serialization.LongDeserializer&#39;.</span>
<span class="sd"> Default: &#39;org.apache.kafka.common.serialization.ByteArrayDeserializer&#39;.</span>
<span class="sd"> :param value_deserializer: A fully-qualified Java class name of a Kafka</span>
<span class="sd"> Deserializer for the topic&#39;s value, e.g.</span>
<span class="sd"> &#39;org.apache.kafka.common.serialization.LongDeserializer&#39;.</span>
<span class="sd"> Default: &#39;org.apache.kafka.common.serialization.ByteArrayDeserializer&#39;.</span>
<span class="sd"> :param start_read_time: Use timestamp to set up start offset in milliseconds</span>
<span class="sd"> epoch.</span>
<span class="sd"> :param max_num_records: Maximum amount of records to be read. Mainly used</span>
<span class="sd"> for tests and demo applications.</span>
<span class="sd"> :param max_read_time: Maximum amount of time in seconds the transform</span>
<span class="sd"> executes. Mainly used for tests and demo applications.</span>
<span class="sd"> :param commit_offset_in_finalize: Whether to commit offsets when finalizing.</span>
<span class="sd"> :param timestamp_policy: The built-in timestamp policy which is used for</span>
<span class="sd"> extracting timestamp from KafkaRecord.</span>
<span class="sd"> :param consumer_polling_timeout: Kafka client polling request</span>
<span class="sd"> timeout time in seconds. A lower timeout optimizes for latency. Increase</span>
<span class="sd"> the timeout if the consumer is not fetching any records. Default is 2</span>
<span class="sd"> seconds.</span>
<span class="sd"> :param with_metadata: whether the returned PCollection should contain</span>
<span class="sd"> Kafka related metadata or not. If False (default), elements of the</span>
<span class="sd"> returned PCollection will be of type &#39;bytes&#39; if True, elements of the</span>
<span class="sd"> returned PCollection will be of the type &#39;Row&#39;. Note that, currently</span>
<span class="sd"> this only works when using default key and value deserializers where</span>
<span class="sd"> Java Kafka Reader reads keys and values as &#39;byte[]&#39;.</span>
<span class="sd"> :param expansion_service: The address (host:port) of the ExpansionService.</span>
<span class="sd"> :param redistribute: whether a Redistribute transform should be applied</span>
<span class="sd"> immediately after the read.</span>
<span class="sd"> :param redistribute_num_keys: Configures how many keys the Redistribute</span>
<span class="sd"> spreads the data across.</span>
<span class="sd"> :param allow_duplicates: whether the Redistribute transform allows for</span>
<span class="sd"> duplicates (this serves solely as a hint to the underlying runner).</span>
<span class="sd"> :param dynamic_read_poll_interval_seconds: The interval in seconds at which</span>
<span class="sd"> to check for new partitions. If not None, dynamic partition discovery</span>
<span class="sd"> is enabled.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">timestamp_policy</span> <span class="ow">not</span> <span class="ow">in</span> <span class="p">[</span><span class="n">ReadFromKafka</span><span class="o">.</span><span class="n">processing_time_policy</span><span class="p">,</span>
<span class="n">ReadFromKafka</span><span class="o">.</span><span class="n">create_time_policy</span><span class="p">,</span>
<span class="n">ReadFromKafka</span><span class="o">.</span><span class="n">log_append_time</span><span class="p">]:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;timestamp_policy should be one of &#39;</span>
<span class="s1">&#39;[ProcessingTime, CreateTime, LogAppendTime]&#39;</span><span class="p">)</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">URN_WITH_METADATA</span> <span class="k">if</span> <span class="n">with_metadata</span> <span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">URN_WITHOUT_METADATA</span><span class="p">,</span>
<span class="n">NamedTupleBasedPayloadBuilder</span><span class="p">(</span>
<span class="n">ReadFromKafkaSchema</span><span class="p">(</span>
<span class="n">consumer_config</span><span class="o">=</span><span class="n">consumer_config</span><span class="p">,</span>
<span class="n">topics</span><span class="o">=</span><span class="n">topics</span><span class="p">,</span>
<span class="n">key_deserializer</span><span class="o">=</span><span class="n">key_deserializer</span><span class="p">,</span>
<span class="n">value_deserializer</span><span class="o">=</span><span class="n">value_deserializer</span><span class="p">,</span>
<span class="n">max_num_records</span><span class="o">=</span><span class="n">max_num_records</span><span class="p">,</span>
<span class="n">max_read_time</span><span class="o">=</span><span class="n">max_read_time</span><span class="p">,</span>
<span class="n">start_read_time</span><span class="o">=</span><span class="n">start_read_time</span><span class="p">,</span>
<span class="n">commit_offset_in_finalize</span><span class="o">=</span><span class="n">commit_offset_in_finalize</span><span class="p">,</span>
<span class="n">timestamp_policy</span><span class="o">=</span><span class="n">timestamp_policy</span><span class="p">,</span>
<span class="n">consumer_polling_timeout</span><span class="o">=</span><span class="n">consumer_polling_timeout</span><span class="p">,</span>
<span class="n">redistribute</span><span class="o">=</span><span class="n">redistribute</span><span class="p">,</span>
<span class="n">redistribute_num_keys</span><span class="o">=</span><span class="n">redistribute_num_keys</span><span class="p">,</span>
<span class="n">allow_duplicates</span><span class="o">=</span><span class="n">allow_duplicates</span><span class="p">,</span>
<span class="n">dynamic_read_poll_interval_seconds</span><span class="o">=</span>
<span class="n">dynamic_read_poll_interval_seconds</span><span class="p">)),</span>
<span class="n">expansion_service</span> <span class="ow">or</span> <span class="n">default_io_expansion_service</span><span class="p">())</span></div>
<span class="n">WriteToKafkaSchema</span> <span class="o">=</span> <span class="n">typing</span><span class="o">.</span><span class="n">NamedTuple</span><span class="p">(</span>
<span class="s1">&#39;WriteToKafkaSchema&#39;</span><span class="p">,</span>
<span class="p">[</span>
<span class="p">(</span><span class="s1">&#39;producer_config&#39;</span><span class="p">,</span> <span class="n">typing</span><span class="o">.</span><span class="n">Mapping</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]),</span>
<span class="p">(</span><span class="s1">&#39;topic&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">),</span>
<span class="p">(</span><span class="s1">&#39;key_serializer&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">),</span>
<span class="p">(</span><span class="s1">&#39;value_serializer&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">),</span>
<span class="p">])</span>
<div class="viewcode-block" id="WriteToKafka">
<a class="viewcode-back" href="../../../apache_beam.io.kafka.html#apache_beam.io.kafka.WriteToKafka">[docs]</a>
<span class="k">class</span><span class="w"> </span><span class="nc">WriteToKafka</span><span class="p">(</span><span class="n">ExternalTransform</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> An external PTransform which writes KV data to a specified Kafka topic.</span>
<span class="sd"> If no Kafka Serializer for key/value is provided, then key/value are</span>
<span class="sd"> assumed to be byte arrays.</span>
<span class="sd"> Experimental; no backwards compatibility guarantees.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Default serializer which passes raw bytes to Kafka</span>
<span class="n">byte_array_serializer</span> <span class="o">=</span> <span class="p">(</span>
<span class="s1">&#39;org.apache.kafka.common.serialization.ByteArraySerializer&#39;</span><span class="p">)</span>
<span class="n">URN</span> <span class="o">=</span> <span class="s1">&#39;beam:transform:org.apache.beam:kafka_write:v1&#39;</span>
<span class="k">def</span><span class="w"> </span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">producer_config</span><span class="p">,</span>
<span class="n">topic</span><span class="p">,</span>
<span class="n">key_serializer</span><span class="o">=</span><span class="n">byte_array_serializer</span><span class="p">,</span>
<span class="n">value_serializer</span><span class="o">=</span><span class="n">byte_array_serializer</span><span class="p">,</span>
<span class="n">expansion_service</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Initializes a write operation to Kafka.</span>
<span class="sd"> :param producer_config: A dictionary containing the producer configuration.</span>
<span class="sd"> :param topic: A Kafka topic name.</span>
<span class="sd"> :param key_deserializer: A fully-qualified Java class name of a Kafka</span>
<span class="sd"> Serializer for the topic&#39;s key, e.g.</span>
<span class="sd"> &#39;org.apache.kafka.common.serialization.LongSerializer&#39;.</span>
<span class="sd"> Default: &#39;org.apache.kafka.common.serialization.ByteArraySerializer&#39;.</span>
<span class="sd"> :param value_deserializer: A fully-qualified Java class name of a Kafka</span>
<span class="sd"> Serializer for the topic&#39;s value, e.g.</span>
<span class="sd"> &#39;org.apache.kafka.common.serialization.LongSerializer&#39;.</span>
<span class="sd"> Default: &#39;org.apache.kafka.common.serialization.ByteArraySerializer&#39;.</span>
<span class="sd"> :param expansion_service: The address (host:port) of the ExpansionService.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">URN</span><span class="p">,</span>
<span class="n">NamedTupleBasedPayloadBuilder</span><span class="p">(</span>
<span class="n">WriteToKafkaSchema</span><span class="p">(</span>
<span class="n">producer_config</span><span class="o">=</span><span class="n">producer_config</span><span class="p">,</span>
<span class="n">topic</span><span class="o">=</span><span class="n">topic</span><span class="p">,</span>
<span class="n">key_serializer</span><span class="o">=</span><span class="n">key_serializer</span><span class="p">,</span>
<span class="n">value_serializer</span><span class="o">=</span><span class="n">value_serializer</span><span class="p">,</span>
<span class="p">)),</span>
<span class="n">expansion_service</span> <span class="ow">or</span> <span class="n">default_io_expansion_service</span><span class="p">())</span></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>&#169; Copyright %Y, Apache Beam.</p>
</div>
Built with <a href="https://www.sphinx-doc.org/">Sphinx</a> using a
<a href="https://github.com/readthedocs/sphinx_rtd_theme">theme</a>
provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script>
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>