blob: 5882e79ec3d0ad2f5dd4dd3db3e83671905df6ad [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Kafka source connector · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="The Kafka source connector pulls messages from Kafka topics and persists the messages"/><meta name="docsearch:version" content="2.8.2"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Kafka source connector · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="The Kafka source connector pulls messages from Kafka topics and persists the messages"/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.8.2</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class=""><a href="/docs/en/2.8.2/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class=""><a href="/docs/en/2.8.2/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.8.2/io-kafka-source">日本語</a></li><li><a href="/docs/fr/2.8.2/io-kafka-source">Français</a></li><li><a href="/docs/ko/2.8.2/io-kafka-source">한국어</a></li><li><a href="/docs/zh-CN/2.8.2/io-kafka-source">中文</a></li><li><a href="/docs/zh-TW/2.8.2/io-kafka-source">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-kafka-source.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Kafka source connector</h1></header><article><div><span><p>The Kafka source connector pulls messages from Kafka topics and persists the messages
to Pulsar topics.</p>
<p>This guide explains how to configure and use the Kafka source connector.</p>
<h2><a class="anchor" aria-hidden="true" id="configuration"></a><a href="#configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuration</h2>
<p>The configuration of the Kafka source connector has the following properties.</p>
<h3><a class="anchor" aria-hidden="true" id="property"></a><a href="#property" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Property</h3>
<table>
<thead>
<tr><th>Name</th><th>Type</th><th>Required</th><th>Default</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>bootstrapServers</code></td><td>String</td><td>true</td><td>&quot; &quot; (empty string)</td><td>A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster.</td></tr>
<tr><td><code>groupId</code></td><td>String</td><td>true</td><td>&quot; &quot; (empty string)</td><td>A unique string that identifies the group of consumer processes to which this consumer belongs.</td></tr>
<tr><td><code>fetchMinBytes</code></td><td>long</td><td>false</td><td>1</td><td>The minimum byte expected for each fetch response.</td></tr>
<tr><td><code>autoCommitEnabled</code></td><td>boolean</td><td>false</td><td>true</td><td>If set to true, the consumer's offset is periodically committed in the background.<br/><br/> This committed offset is used when the process fails as the position from which a new consumer begins.</td></tr>
<tr><td><code>autoCommitIntervalMs</code></td><td>long</td><td>false</td><td>5000</td><td>The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if <code>autoCommitEnabled</code> is set to true.</td></tr>
<tr><td><code>heartbeatIntervalMs</code></td><td>long</td><td>false</td><td>3000</td><td>The interval between heartbeats to the consumer when using Kafka's group management facilities. <br/><br/><strong>Note: <code>heartbeatIntervalMs</code> must be smaller than <code>sessionTimeoutMs</code></strong>.</td></tr>
<tr><td><code>sessionTimeoutMs</code></td><td>long</td><td>false</td><td>30000</td><td>The timeout used to detect consumer failures when using Kafka's group management facility.</td></tr>
<tr><td><code>topic</code></td><td>String</td><td>true</td><td>&quot; &quot; (empty string)</td><td>The Kafka topic which sends messages to Pulsar.</td></tr>
<tr><td><code>consumerConfigProperties</code></td><td>Map</td><td>false</td><td>&quot; &quot; (empty string)</td><td>The consumer configuration properties to be passed to consumers. <br/><br/><strong>Note: other properties specified in the connector configuration file take precedence over this configuration</strong>.</td></tr>
<tr><td><code>keyDeserializationClass</code></td><td>String</td><td>false</td><td>org.apache.kafka.common.serialization.StringDeserializer</td><td>The deserializer class for Kafka consumers to deserialize keys.<br/> The deserializer is set by a specific implementation of <a href="https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java"><code>KafkaAbstractSource</code></a>.</td></tr>
<tr><td><code>valueDeserializationClass</code></td><td>String</td><td>false</td><td>org.apache.kafka.common.serialization.ByteArrayDeserializer</td><td>The deserializer class for Kafka consumers to deserialize values.</td></tr>
<tr><td><code>autoOffsetReset</code></td><td>String</td><td>false</td><td>&quot;earliest&quot;</td><td>The default offset reset policy.</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="schema-management"></a><a href="#schema-management" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Schema Management</h3>
<p>This Kafka source connector applies the schema to the topic depending on the data type that is present on the Kafka topic.
You can detect the data type from the <code>keyDeserializationClass</code> and <code>valueDeserializationClass</code> configuration parameters.</p>
<p>If the <code>valueDeserializationClass</code> is <code>org.apache.kafka.common.serialization.StringDeserializer</code>, you can set Schema.STRING() as schema type on the Pulsar topic.</p>
<p>If <code>valueDeserializationClass</code> is <code>io.confluent.kafka.serializers.KafkaAvroDeserializer</code>, Pulsar downloads the AVRO schema from the Confluent Schema Registry®
and sets it properly on the Pulsar topic.</p>
<p>In this case, you need to set <code>schema.registry.url</code> inside of the <code>consumerConfigProperties</code> configuration entry
of the source.</p>
<p>If <code>keyDeserializationClass</code> is not <code>org.apache.kafka.common.serialization.StringDeserializer</code>, it means
that you do not have a String as key and the Kafka Source uses the KeyValue schema type with the SEPARATED encoding.</p>
<p>Pulsar supports AVRO format for keys.</p>
<p>In this case, you can have a Pulsar topic with the following properties:</p>
<ul>
<li>Schema: KeyValue schema with SEPARATED encoding</li>
<li>Key: the content of key of the Kafka message (base64 encoded)</li>
<li>Value: the content of value of the Kafka message</li>
<li>KeySchema: the schema detected from <code>keyDeserializationClass</code></li>
<li>ValueSchema: the schema detected from <code>valueDeserializationClass</code></li>
</ul>
<p>Topic compaction and partition routing use the Pulsar key, that contains the Kafka key, and so they are driven by the same value that you have on Kafka.</p>
<p>When you consume data from Pulsar topics, you can use the <code>KeyValue</code> schema. In this way, you can decode the data properly.
If you want to access the raw key, you can use the <code>Message#getKeyBytes()</code> API.</p>
<h3><a class="anchor" aria-hidden="true" id="example"></a><a href="#example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example</h3>
<p>Before using the Kafka source connector, you need to create a configuration file through one of the following methods.</p>
<ul>
<li><p>JSON</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"bootstrapServers"</span>: <span class="hljs-string">"pulsar-kafka:9092"</span>,
<span class="hljs-attr">"groupId"</span>: <span class="hljs-string">"test-pulsar-io"</span>,
<span class="hljs-attr">"topic"</span>: <span class="hljs-string">"my-topic"</span>,
<span class="hljs-attr">"sessionTimeoutMs"</span>: <span class="hljs-string">"10000"</span>,
<span class="hljs-attr">"autoCommitEnabled"</span>: <span class="hljs-literal">false</span>
}
</code></pre></li>
<li><p>YAML</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">configs:</span>
<span class="hljs-attr">bootstrapServers:</span> <span class="hljs-string">"pulsar-kafka:9092"</span>
<span class="hljs-attr">groupId:</span> <span class="hljs-string">"test-pulsar-io"</span>
<span class="hljs-attr">topic:</span> <span class="hljs-string">"my-topic"</span>
<span class="hljs-attr">sessionTimeoutMs:</span> <span class="hljs-string">"10000"</span>
<span class="hljs-attr">autoCommitEnabled:</span> <span class="hljs-literal">false</span>
</code></pre></li>
</ul>
<h2><a class="anchor" aria-hidden="true" id="usage"></a><a href="#usage" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Usage</h2>
<p>Here is an example of using the Kafka source connector with the configuration file as shown previously.</p>
<ol>
<li><p>Download a Kafka client and a Kafka connector.</p>
<pre><code class="hljs css language-bash">$ wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar
</code></pre></li>
<li><p>Create a network.</p>
<pre><code class="hljs css language-bash">$ docker network create kafka-pulsar
</code></pre></li>
<li><p>Pull a ZooKeeper image and start ZooKeeper.</p>
<pre><code class="hljs css language-bash">$ docker pull wurstmeister/zookeeper
$ docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper
</code></pre></li>
<li><p>Pull a Kafka image and start Kafka.</p>
<pre><code class="hljs css language-bash">$ docker pull wurstmeister/kafka:2.11-1.0.2
$ docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2
</code></pre></li>
<li><p>Pull a Pulsar image and start Pulsar standalone.</p>
<pre><code class="hljs css language-bash">$ docker pull apachepulsar/pulsar:2.8.2
$ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v <span class="hljs-variable">$PWD</span>/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
</code></pre></li>
<li><p>Create a producer file <em>kafka-producer.py</em>.</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> kafka <span class="hljs-keyword">import</span> KafkaProducer
producer = KafkaProducer(bootstrap_servers=<span class="hljs-string">'pulsar-kafka:9092'</span>)
future = producer.send(<span class="hljs-string">'my-topic'</span>, <span class="hljs-string">b'hello world'</span>)
future.get()
</code></pre></li>
<li><p>Create a consumer file <em>pulsar-client.py</em>.</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">import</span> pulsar
client = pulsar.Client(<span class="hljs-string">'pulsar://localhost:6650'</span>)
consumer = client.subscribe(<span class="hljs-string">'my-topic'</span>, subscription_name=<span class="hljs-string">'my-aa'</span>)
<span class="hljs-keyword">while</span> <span class="hljs-literal">True</span>:
msg = consumer.receive()
<span class="hljs-keyword">print</span> msg
<span class="hljs-keyword">print</span> dir(msg)
print(<span class="hljs-string">"Received message: '%s'"</span> % msg.data())
consumer.acknowledge(msg)
client.close()
</code></pre></li>
<li><p>Copy the following files to Pulsar.</p>
<pre><code class="hljs css language-bash">$ docker cp pulsar-io-kafka-2.8.2.nar pulsar-kafka-standalone:/pulsar
$ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
$ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
$ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
</code></pre></li>
<li><p>Open a new terminal window and start the Kafka source connector in local run mode.</p>
<pre><code class="hljs css language-bash">$ docker <span class="hljs-built_in">exec</span> -it pulsar-kafka-standalone /bin/bash
$ ./bin/pulsar-admin <span class="hljs-built_in">source</span> localrun \
--archive ./pulsar-io-kafka-2.8.2.nar \
--classname org.apache.pulsar.io.kafka.KafkaBytesSource \
--tenant public \
--namespace default \
--name kafka \
--destination-topic-name my-topic \
--<span class="hljs-built_in">source</span>-config-file ./conf/kafkaSourceConfig.yaml \
--parallelism 1
</code></pre></li>
<li><p>Open a new terminal window and run the consumer.</p>
<pre><code class="hljs css language-bash">$ docker <span class="hljs-built_in">exec</span> -it pulsar-kafka-standalone /bin/bash
$ pip install kafka-python
$ python3 kafka-producer.py
</code></pre>
<p>The following information appears on the consumer terminal window.</p>
<pre><code class="hljs css language-bash">Received message: <span class="hljs-string">'hello world'</span>
</code></pre></li>
</ol>
</span></div></article></div><div class="docs-prevnext"></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#configuration">Configuration</a><ul class="toc-headings"><li><a href="#property">Property</a></li><li><a href="#schema-management">Schema Management</a></li><li><a href="#example">Example</a></li></ul></li><li><a href="#usage">Usage</a></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>