blob: 3583d58c0b05882a95bb54f608b7f8cb209ccca4 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Pulsar adaptor for Apache Kafka · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="Pulsar provides an easy option for applications that are currently written using the [Apache Kafka](http://kafka.apache.org) Java client API."/><meta name="docsearch:version" content="2.3.1"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Pulsar adaptor for Apache Kafka · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="Pulsar provides an easy option for applications that are currently written using the [Apache Kafka](http://kafka.apache.org) Java client API."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.3.1</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.3.1/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.3.1/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.3.1/adaptors-kafka">日本語</a></li><li><a href="/docs/fr/2.3.1/adaptors-kafka">Français</a></li><li><a href="/docs/ko/2.3.1/adaptors-kafka">한국어</a></li><li><a href="/docs/zh-CN/2.3.1/adaptors-kafka">中文</a></li><li><a href="/docs/zh-TW/2.3.1/adaptors-kafka">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Adaptors</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Getting Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/getting-started-docker">Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries">Client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-schema-registry">Schema Registry</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-api">API</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-deploying">Deploying functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-guarantees">Processing guarantees</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-state">State Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-metrics">Metrics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-managing">Managing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-connectors">Builtin Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-develop">Developing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-cdc">CDC Connector</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/sql-getting-started">Getting Started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/sql-deployment-configurations">Deployment and Configuration</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-dashboard">Dashboard</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-proxy">Pulsar proxy</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-token-client">Client Authentication using tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-token-admin">Token authentication admin</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-extending">Extending</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-websocket">WebSocket</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-schemas">Schemas</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.3.1/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/reference-configuration">Pulsar configuration</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/adaptors-kafka.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Pulsar adaptor for Apache Kafka</h1></header><article><div><span><p>Pulsar provides an easy option for applications that are currently written using the <a href="http://kafka.apache.org">Apache Kafka</a> Java client API.</p>
<h2><a class="anchor" aria-hidden="true" id="using-the-pulsar-kafka-compatibility-wrapper"></a><a href="#using-the-pulsar-kafka-compatibility-wrapper" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Using the Pulsar Kafka compatibility wrapper</h2>
<p>In an existing application, change the regular Kafka client dependency and replace it with the Pulsar Kafka wrapper. Remove the following dependency in <code>pom.xml</code>:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.kafka<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>kafka-clients<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>0.10.2.1<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>
</code></pre>
<p>Then include this dependency for the Pulsar Kafka wrapper:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.pulsar<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>pulsar-client-kafka<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>2.3.1<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>
</code></pre>
<p>With the new dependency, the existing code works without any changes. You need to adjust the configuration, and make sure it points the
producers and consumers to Pulsar service rather than Kafka, and uses a particular
Pulsar topic.</p>
<h2><a class="anchor" aria-hidden="true" id="using-the-pulsar-kafka-compatibility-wrapper-together-with-existing-kafka-client"></a><a href="#using-the-pulsar-kafka-compatibility-wrapper-together-with-existing-kafka-client" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Using the Pulsar Kafka compatibility wrapper together with existing kafka client</h2>
<p>When migrating from Kafka to Pulsar, the application might use the original kafka client
and the pulsar kafka wrapper together during migration. You should consider using the
unshaded pulsar kafka client wrapper.</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.pulsar<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>pulsar-client-kafka-original<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>2.3.1<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>
</code></pre>
<p>When using this dependency, construct producers using <code>org.apache.kafka.clients.producer.PulsarKafkaProducer</code>
instead of <code>org.apache.kafka.clients.producer.KafkaProducer</code> and <code>org.apache.kafka.clients.producer.PulsarKafkaConsumer</code> for consumers.</p>
<h2><a class="anchor" aria-hidden="true" id="producer-example"></a><a href="#producer-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producer example</h2>
<pre><code class="hljs css language-java"><span class="hljs-comment">// Topic needs to be a regular Pulsar topic</span>
String topic = <span class="hljs-string">"persistent://public/default/my-topic"</span>;
Properties props = <span class="hljs-keyword">new</span> Properties();
<span class="hljs-comment">// Point to a Pulsar service</span>
props.put(<span class="hljs-string">"bootstrap.servers"</span>, <span class="hljs-string">"pulsar://localhost:6650"</span>);
props.put(<span class="hljs-string">"key.serializer"</span>, IntegerSerializer<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>())</span>;
props.put(<span class="hljs-string">"value.serializer"</span>, StringSerializer<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>())</span>;
Producer&lt;Integer, String&gt; producer = <span class="hljs-keyword">new</span> KafkaProducer&lt;&gt;(props);
<span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">0</span>; i &lt; <span class="hljs-number">10</span>; i++) {
producer.send(<span class="hljs-keyword">new</span> ProducerRecord&lt;Integer, String&gt;(topic, i, <span class="hljs-string">"hello-"</span> + i));
log.info(<span class="hljs-string">"Message {} sent successfully"</span>, i);
}
producer.close();
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="consumer-example"></a><a href="#consumer-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumer example</h2>
<pre><code class="hljs css language-java">String topic = <span class="hljs-string">"persistent://public/default/my-topic"</span>;
Properties props = <span class="hljs-keyword">new</span> Properties();
<span class="hljs-comment">// Point to a Pulsar service</span>
props.put(<span class="hljs-string">"bootstrap.servers"</span>, <span class="hljs-string">"pulsar://localhost:6650"</span>);
props.put(<span class="hljs-string">"group.id"</span>, <span class="hljs-string">"my-subscription-name"</span>);
props.put(<span class="hljs-string">"enable.auto.commit"</span>, <span class="hljs-string">"false"</span>);
props.put(<span class="hljs-string">"key.deserializer"</span>, IntegerDeserializer<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>())</span>;
props.put(<span class="hljs-string">"value.deserializer"</span>, StringDeserializer<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>())</span>;
Consumer&lt;Integer, String&gt; consumer = <span class="hljs-keyword">new</span> KafkaConsumer&lt;&gt;(props);
consumer.subscribe(Arrays.asList(topic));
<span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
ConsumerRecords&lt;Integer, String&gt; records = consumer.poll(<span class="hljs-number">100</span>);
records.forEach(record -&gt; {
log.info(<span class="hljs-string">"Received record: {}"</span>, record);
});
<span class="hljs-comment">// Commit last offset</span>
consumer.commitSync();
}
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="complete-examples"></a><a href="#complete-examples" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Complete Examples</h2>
<p>You can find the complete producer and consumer examples
<a href="https://github.com/apache/pulsar/tree/master/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples">here</a>.</p>
<h2><a class="anchor" aria-hidden="true" id="compatibility-matrix"></a><a href="#compatibility-matrix" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Compatibility matrix</h2>
<p>Currently the Pulsar Kafka wrapper supports most of the operations offered by the Kafka API.</p>
<h4><a class="anchor" aria-hidden="true" id="producer"></a><a href="#producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producer</h4>
<p>APIs:</p>
<table>
<thead>
<tr><th style="text-align:left">Producer Method</th><th style="text-align:left">Supported</th><th style="text-align:left">Notes</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>Future&lt;RecordMetadata&gt; send(ProducerRecord&lt;K, V&gt; record)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>Future&lt;RecordMetadata&gt; send(ProducerRecord&lt;K, V&gt; record, Callback callback)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void flush()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>List&lt;PartitionInfo&gt; partitionsFor(String topic)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>Map&lt;MetricName, ? extends Metric&gt; metrics()</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void close()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void close(long timeout, TimeUnit unit)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
</tbody>
</table>
<p>Properties:</p>
<table>
<thead>
<tr><th style="text-align:left">Config property</th><th style="text-align:left">Supported</th><th style="text-align:left">Notes</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>acks</code></td><td style="text-align:left">Ignored</td><td style="text-align:left">Durability and quorum writes are configured at the namespace level</td></tr>
<tr><td style="text-align:left"><code>auto.offset.reset</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Will have a default value of <code>latest</code> if user does not give specific setting.</td></tr>
<tr><td style="text-align:left"><code>batch.size</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>block.on.buffer.full</code></td><td style="text-align:left">Yes</td><td style="text-align:left">If true it will block producer, otherwise an error is returned.</td></tr>
<tr><td style="text-align:left"><code>bootstrap.servers</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>buffer.memory</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>client.id</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>compression.type</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Allows <code>gzip</code> and <code>lz4</code>. No <code>snappy</code>.</td></tr>
<tr><td style="text-align:left"><code>connections.max.idle.ms</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Only support up to 2,147,483,647,000(Integer.MAX_VALUE * 1000) ms of idle time</td></tr>
<tr><td style="text-align:left"><code>interceptor.classes</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>key.serializer</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>linger.ms</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Controls the group commit time when batching messages</td></tr>
<tr><td style="text-align:left"><code>max.block.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>max.in.flight.requests.per.connection</code></td><td style="text-align:left">Ignored</td><td style="text-align:left">In Pulsar ordering is maintained even with multiple requests in flight</td></tr>
<tr><td style="text-align:left"><code>max.request.size</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>metric.reporters</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>metrics.num.samples</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>metrics.sample.window.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>partitioner.class</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>receive.buffer.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>reconnect.backoff.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>request.timeout.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>retries</code></td><td style="text-align:left">Ignored</td><td style="text-align:left">Pulsar client retries with exponential backoff until the send timeout expires.</td></tr>
<tr><td style="text-align:left"><code>send.buffer.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>timeout.ms</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>value.serializer</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
</tbody>
</table>
<h4><a class="anchor" aria-hidden="true" id="consumer"></a><a href="#consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumer</h4>
<p>The following table lists consumer APIs.</p>
<table>
<thead>
<tr><th style="text-align:left">Consumer Method</th><th style="text-align:left">Supported</th><th style="text-align:left">Notes</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>Set&lt;TopicPartition&gt; assignment()</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>Set&lt;String&gt; subscription()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void subscribe(Collection&lt;String&gt; topics)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void subscribe(Collection&lt;String&gt; topics, ConsumerRebalanceListener callback)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void assign(Collection&lt;TopicPartition&gt; partitions)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void subscribe(Pattern pattern, ConsumerRebalanceListener callback)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void unsubscribe()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>ConsumerRecords&lt;K, V&gt; poll(long timeoutMillis)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void commitSync()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void commitSync(Map&lt;TopicPartition, OffsetAndMetadata&gt; offsets)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void commitAsync()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void commitAsync(OffsetCommitCallback callback)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void commitAsync(Map&lt;TopicPartition, OffsetAndMetadata&gt; offsets, OffsetCommitCallback callback)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void seek(TopicPartition partition, long offset)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void seekToBeginning(Collection&lt;TopicPartition&gt; partitions)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void seekToEnd(Collection&lt;TopicPartition&gt; partitions)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>long position(TopicPartition partition)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>OffsetAndMetadata committed(TopicPartition partition)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>Map&lt;MetricName, ? extends Metric&gt; metrics()</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>List&lt;PartitionInfo&gt; partitionsFor(String topic)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>Map&lt;String, List&lt;PartitionInfo&gt;&gt; listTopics()</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>Set&lt;TopicPartition&gt; paused()</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void pause(Collection&lt;TopicPartition&gt; partitions)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void resume(Collection&lt;TopicPartition&gt; partitions)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>Map&lt;TopicPartition, OffsetAndTimestamp&gt; offsetsForTimes(Map&lt;TopicPartition, Long&gt; timestampsToSearch)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>Map&lt;TopicPartition, Long&gt; beginningOffsets(Collection&lt;TopicPartition&gt; partitions)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>Map&lt;TopicPartition, Long&gt; endOffsets(Collection&lt;TopicPartition&gt; partitions)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void close()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void close(long timeout, TimeUnit unit)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>void wakeup()</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr>
</tbody>
</table>
<p>Properties:</p>
<table>
<thead>
<tr><th style="text-align:left">Config property</th><th style="text-align:left">Supported</th><th style="text-align:left">Notes</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>group.id</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Maps to a Pulsar subscription name</td></tr>
<tr><td style="text-align:left"><code>max.poll.records</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>max.poll.interval.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left">Messages are &quot;pushed&quot; from broker</td></tr>
<tr><td style="text-align:left"><code>session.timeout.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>heartbeat.interval.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>bootstrap.servers</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Needs to point to a single Pulsar service URL</td></tr>
<tr><td style="text-align:left"><code>enable.auto.commit</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>auto.commit.interval.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left">With auto-commit, acks are sent immediately to broker</td></tr>
<tr><td style="text-align:left"><code>partition.assignment.strategy</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>auto.offset.reset</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Only support earliest and latest.</td></tr>
<tr><td style="text-align:left"><code>fetch.min.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>fetch.max.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>fetch.max.wait.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>interceptor.classes</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>metadata.max.age.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>max.partition.fetch.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>send.buffer.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>receive.buffer.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left"><code>client.id</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr>
</tbody>
</table>
<h2><a class="anchor" aria-hidden="true" id="customize-pulsar-configurations"></a><a href="#customize-pulsar-configurations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Customize Pulsar configurations</h2>
<p>You can configure Pulsar authentication provider directly from the Kafka properties.</p>
<h3><a class="anchor" aria-hidden="true" id="pulsar-client-properties"></a><a href="#pulsar-client-properties" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Pulsar client properties</h3>
<table>
<thead>
<tr><th style="text-align:left">Config property</th><th style="text-align:left">Default</th><th style="text-align:left">Notes</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setAuthentication-org.apache.pulsar.client.api.Authentication-"><code>pulsar.authentication.class</code></a></td><td style="text-align:left"></td><td style="text-align:left">Configure to auth provider. For example, <code>org.apache.pulsar.client.impl.auth.AuthenticationTls</code>.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setAuthentication-java.lang.String-java.util.Map-"><code>pulsar.authentication.params.map</code></a></td><td style="text-align:left"></td><td style="text-align:left">Map which represents parameters for the Authentication-Plugin.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setAuthentication-java.lang.String-java.lang.String-"><code>pulsar.authentication.params.string</code></a></td><td style="text-align:left"></td><td style="text-align:left">String which represents parameters for the Authentication-Plugin, for example, <code>key1:val1,key2:val2</code>.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTls-boolean-"><code>pulsar.use.tls</code></a></td><td style="text-align:left"><code>false</code></td><td style="text-align:left">Enable TLS transport encryption.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsTrustCertsFilePath-java.lang.String-"><code>pulsar.tls.trust.certs.file.path</code></a></td><td style="text-align:left"></td><td style="text-align:left">Path for the TLS trust certificate store.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsAllowInsecureConnection-boolean-"><code>pulsar.tls.allow.insecure.connection</code></a></td><td style="text-align:left"><code>false</code></td><td style="text-align:left">Accept self-signed certificates from brokers.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setOperationTimeout-int-java.util.concurrent.TimeUnit-"><code>pulsar.operation.timeout.ms</code></a></td><td style="text-align:left"><code>30000</code></td><td style="text-align:left">General operations timeout.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setStatsInterval-long-java.util.concurrent.TimeUnit-"><code>pulsar.stats.interval.seconds</code></a></td><td style="text-align:left"><code>60</code></td><td style="text-align:left">Pulsar client lib stats printing interval.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setIoThreads-int-"><code>pulsar.num.io.threads</code></a></td><td style="text-align:left"><code>1</code></td><td style="text-align:left">The number of Netty IO threads to use.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setConnectionsPerBroker-int-"><code>pulsar.connections.per.broker</code></a></td><td style="text-align:left"><code>1</code></td><td style="text-align:left">The maximum number of connection to each broker.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTcpNoDelay-boolean-"><code>pulsar.use.tcp.nodelay</code></a></td><td style="text-align:left"><code>true</code></td><td style="text-align:left">TCP no-delay.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setConcurrentLookupRequest-int-"><code>pulsar.concurrent.lookup.requests</code></a></td><td style="text-align:left"><code>50000</code></td><td style="text-align:left">The maximum number of concurrent topic lookups.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setMaxNumberOfRejectedRequestPerConnection-int-"><code>pulsar.max.number.rejected.request.per.connection</code></a></td><td style="text-align:left"><code>50</code></td><td style="text-align:left">The threshold of errors to forcefully close a connection.</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="pulsar-producer-properties"></a><a href="#pulsar-producer-properties" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Pulsar producer properties</h3>
<table>
<thead>
<tr><th style="text-align:left">Config property</th><th style="text-align:left">Default</th><th style="text-align:left">Notes</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerConfiguration.html#setProducerName-java.lang.String-"><code>pulsar.producer.name</code></a></td><td style="text-align:left"></td><td style="text-align:left">Specify the producer name.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerConfiguration.html#setInitialSequenceId-long-"><code>pulsar.producer.initial.sequence.id</code></a></td><td style="text-align:left"></td><td style="text-align:left">Specify baseline for sequence ID of this producer.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessages-int-"><code>pulsar.producer.max.pending.messages</code></a></td><td style="text-align:left"><code>1000</code></td><td style="text-align:left">Set the maximum size of the message queue pending to receive an acknowledgment from the broker.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessagesAcrossPartitions-int-"><code>pulsar.producer.max.pending.messages.across.partitions</code></a></td><td style="text-align:left"><code>50000</code></td><td style="text-align:left">Set the maximum number of pending messages across all the partitions.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingEnabled-boolean-"><code>pulsar.producer.batching.enabled</code></a></td><td style="text-align:left"><code>true</code></td><td style="text-align:left">Control whether automatic batching of messages is enabled for the producer.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingMaxMessages-int-"><code>pulsar.producer.batching.max.messages</code></a></td><td style="text-align:left"><code>1000</code></td><td style="text-align:left">The maximum number of messages in a batch.</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="pulsar-consumer-properties"></a><a href="#pulsar-consumer-properties" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Pulsar consumer Properties</h3>
<table>
<thead>
<tr><th style="text-align:left">Config property</th><th style="text-align:left">Default</th><th style="text-align:left">Notes</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ConsumerConfiguration.html#setConsumerName-java.lang.String-"><code>pulsar.consumer.name</code></a></td><td style="text-align:left"></td><td style="text-align:left">Specify the consumer name.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ConsumerConfiguration.html#setReceiverQueueSize-int-"><code>pulsar.consumer.receiver.queue.size</code></a></td><td style="text-align:left">1000</td><td style="text-align:left">Set the size of the consumer receiver queue.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ConsumerBuilder.html#acknowledgmentGroupTime-long-java.util.concurrent.TimeUnit-"><code>pulsar.consumer.acknowledgments.group.time.millis</code></a></td><td style="text-align:left">100</td><td style="text-align:left">Set the maximum amount of group time for consumers to send the acknowledgments to the broker.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ConsumerConfiguration.html#setMaxTotalReceiverQueueSizeAcrossPartitions-int-"><code>pulsar.consumer.total.receiver.queue.size.across.partitions</code></a></td><td style="text-align:left">50000</td><td style="text-align:left">Set the maximum size of the total receiver queue across partitions.</td></tr>
<tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ConsumerBuilder.html#subscriptionTopicsMode-Mode-"><code>pulsar.consumer.subscription.topics.mode</code></a></td><td style="text-align:left">PersistentOnly</td><td style="text-align:left">Set the subscription topic mode for consumers.</td></tr>
</tbody>
</table>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.3.1/admin-api-schemas"><span class="arrow-prev"></span><span>Schemas</span></a><a class="docs-next button" href="/docs/en/2.3.1/adaptors-spark"><span>Apache Spark</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#using-the-pulsar-kafka-compatibility-wrapper">Using the Pulsar Kafka compatibility wrapper</a></li><li><a href="#using-the-pulsar-kafka-compatibility-wrapper-together-with-existing-kafka-client">Using the Pulsar Kafka compatibility wrapper together with existing kafka client</a></li><li><a href="#producer-example">Producer example</a></li><li><a href="#consumer-example">Consumer example</a></li><li><a href="#complete-examples">Complete Examples</a></li><li><a href="#compatibility-matrix">Compatibility matrix</a></li><li><a href="#customize-pulsar-configurations">Customize Pulsar configurations</a><ul class="toc-headings"><li><a href="#pulsar-client-properties">Pulsar client properties</a></li><li><a href="#pulsar-producer-properties">Pulsar producer properties</a></li><li><a href="#pulsar-consumer-properties">Pulsar consumer Properties</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>