| <!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Pulsar adaptor for Apache Kafka · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="Pulsar provides an easy option for applications that are currently written using the [Apache Kafka](http://kafka.apache.org) Java client API."/><meta name="docsearch:version" content="2.3.1"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Pulsar adaptor for Apache Kafka · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="Pulsar provides an easy option for applications that are currently written using the [Apache Kafka](http://kafka.apache.org) Java client API."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.3.1</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.3.1/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.3.1/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.3.1/adaptors-kafka">日本語</a></li><li><a href="/docs/fr/2.3.1/adaptors-kafka">Français</a></li><li><a href="/docs/ko/2.3.1/adaptors-kafka">한국어</a></li><li><a href="/docs/zh-CN/2.3.1/adaptors-kafka">中文</a></li><li><a href="/docs/zh-TW/2.3.1/adaptors-kafka">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script> |
| const languagesMenuItem = document.getElementById("languages-menu"); |
| const languagesDropDown = document.getElementById("languages-dropdown"); |
| languagesMenuItem.addEventListener("click", function(event) { |
| event.preventDefault(); |
| |
| if (languagesDropDown.className == "hide") { |
| languagesDropDown.className = "visible"; |
| } else { |
| languagesDropDown.className = "hide"; |
| } |
| }); |
| </script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i>›</i><span>Adaptors</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Getting Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/getting-started-docker">Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries">Client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-schema-registry">Schema Registry</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-api">API</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-deploying">Deploying functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-guarantees">Processing guarantees</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-state">State Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-metrics">Metrics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-managing">Managing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-connectors">Builtin Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-develop">Developing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-cdc">CDC Connector</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/sql-getting-started">Getting Started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/sql-deployment-configurations">Deployment and Configuration</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-dashboard">Dashboard</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-proxy">Pulsar proxy</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-token-client">Client Authentication using tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-token-admin">Token authentication admin</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-extending">Extending</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-websocket">WebSocket</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-schemas">Schemas</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.3.1/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/reference-configuration">Pulsar configuration</a></li></ul></div></div></section></div><script> |
| var coll = document.getElementsByClassName('collapsible'); |
| var checkActiveCategory = true; |
| for (var i = 0; i < coll.length; i++) { |
| var links = coll[i].nextElementSibling.getElementsByTagName('*'); |
| if (checkActiveCategory){ |
| for (var j = 0; j < links.length; j++) { |
| if (links[j].classList.contains('navListItemActive')){ |
| coll[i].nextElementSibling.classList.toggle('hide'); |
| coll[i].childNodes[1].classList.toggle('rotate'); |
| checkActiveCategory = false; |
| break; |
| } |
| } |
| } |
| |
| coll[i].addEventListener('click', function() { |
| var arrow = this.childNodes[1]; |
| arrow.classList.toggle('rotate'); |
| var content = this.nextElementSibling; |
| content.classList.toggle('hide'); |
| }); |
| } |
| |
| document.addEventListener('DOMContentLoaded', function() { |
| createToggler('#navToggler', '#docsNav', 'docsSliderActive'); |
| createToggler('#tocToggler', 'body', 'tocActive'); |
| |
| var headings = document.querySelector('.toc-headings'); |
| headings && headings.addEventListener('click', function(event) { |
| var el = event.target; |
| while(el !== headings){ |
| if (el.tagName === 'A') { |
| document.body.classList.remove('tocActive'); |
| break; |
| } else{ |
| el = el.parentNode; |
| } |
| } |
| }, false); |
| |
| function createToggler(togglerSelector, targetSelector, className) { |
| var toggler = document.querySelector(togglerSelector); |
| var target = document.querySelector(targetSelector); |
| |
| if (!toggler) { |
| return; |
| } |
| |
| toggler.onclick = function(event) { |
| event.preventDefault(); |
| |
| target.classList.toggle(className); |
| }; |
| } |
| }); |
| </script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/adaptors-kafka.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Pulsar adaptor for Apache Kafka</h1></header><article><div><span><p>Pulsar provides an easy option for applications that are currently written using the <a href="http://kafka.apache.org">Apache Kafka</a> Java client API.</p> |
| <h2><a class="anchor" aria-hidden="true" id="using-the-pulsar-kafka-compatibility-wrapper"></a><a href="#using-the-pulsar-kafka-compatibility-wrapper" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Using the Pulsar Kafka compatibility wrapper</h2> |
| <p>In an existing application, change the regular Kafka client dependency and replace it with the Pulsar Kafka wrapper. Remove the following dependency in <code>pom.xml</code>:</p> |
| <pre><code class="hljs css language-xml"><span class="hljs-tag"><<span class="hljs-name">dependency</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">groupId</span>></span>org.apache.kafka<span class="hljs-tag"></<span class="hljs-name">groupId</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">artifactId</span>></span>kafka-clients<span class="hljs-tag"></<span class="hljs-name">artifactId</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">version</span>></span>0.10.2.1<span class="hljs-tag"></<span class="hljs-name">version</span>></span> |
| <span class="hljs-tag"></<span class="hljs-name">dependency</span>></span> |
| </code></pre> |
| <p>Then include this dependency for the Pulsar Kafka wrapper:</p> |
| <pre><code class="hljs css language-xml"><span class="hljs-tag"><<span class="hljs-name">dependency</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">groupId</span>></span>org.apache.pulsar<span class="hljs-tag"></<span class="hljs-name">groupId</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">artifactId</span>></span>pulsar-client-kafka<span class="hljs-tag"></<span class="hljs-name">artifactId</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">version</span>></span>2.3.1<span class="hljs-tag"></<span class="hljs-name">version</span>></span> |
| <span class="hljs-tag"></<span class="hljs-name">dependency</span>></span> |
| </code></pre> |
| <p>With the new dependency, the existing code works without any changes. You need to adjust the configuration, and make sure it points the |
| producers and consumers to Pulsar service rather than Kafka, and uses a particular |
| Pulsar topic.</p> |
| <h2><a class="anchor" aria-hidden="true" id="using-the-pulsar-kafka-compatibility-wrapper-together-with-existing-kafka-client"></a><a href="#using-the-pulsar-kafka-compatibility-wrapper-together-with-existing-kafka-client" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Using the Pulsar Kafka compatibility wrapper together with existing kafka client</h2> |
| <p>When migrating from Kafka to Pulsar, the application might use the original kafka client |
| and the pulsar kafka wrapper together during migration. You should consider using the |
| unshaded pulsar kafka client wrapper.</p> |
| <pre><code class="hljs css language-xml"><span class="hljs-tag"><<span class="hljs-name">dependency</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">groupId</span>></span>org.apache.pulsar<span class="hljs-tag"></<span class="hljs-name">groupId</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">artifactId</span>></span>pulsar-client-kafka-original<span class="hljs-tag"></<span class="hljs-name">artifactId</span>></span> |
| <span class="hljs-tag"><<span class="hljs-name">version</span>></span>2.3.1<span class="hljs-tag"></<span class="hljs-name">version</span>></span> |
| <span class="hljs-tag"></<span class="hljs-name">dependency</span>></span> |
| </code></pre> |
| <p>When using this dependency, construct producers using <code>org.apache.kafka.clients.producer.PulsarKafkaProducer</code> |
| instead of <code>org.apache.kafka.clients.producer.KafkaProducer</code> and <code>org.apache.kafka.clients.producer.PulsarKafkaConsumer</code> for consumers.</p> |
| <h2><a class="anchor" aria-hidden="true" id="producer-example"></a><a href="#producer-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producer example</h2> |
| <pre><code class="hljs css language-java"><span class="hljs-comment">// Topic needs to be a regular Pulsar topic</span> |
| String topic = <span class="hljs-string">"persistent://public/default/my-topic"</span>; |
| |
| Properties props = <span class="hljs-keyword">new</span> Properties(); |
| <span class="hljs-comment">// Point to a Pulsar service</span> |
| props.put(<span class="hljs-string">"bootstrap.servers"</span>, <span class="hljs-string">"pulsar://localhost:6650"</span>); |
| |
| props.put(<span class="hljs-string">"key.serializer"</span>, IntegerSerializer<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>())</span>; |
| props.put(<span class="hljs-string">"value.serializer"</span>, StringSerializer<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>())</span>; |
| |
| Producer<Integer, String> producer = <span class="hljs-keyword">new</span> KafkaProducer<>(props); |
| |
| <span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">0</span>; i < <span class="hljs-number">10</span>; i++) { |
| producer.send(<span class="hljs-keyword">new</span> ProducerRecord<Integer, String>(topic, i, <span class="hljs-string">"hello-"</span> + i)); |
| log.info(<span class="hljs-string">"Message {} sent successfully"</span>, i); |
| } |
| |
| producer.close(); |
| </code></pre> |
| <h2><a class="anchor" aria-hidden="true" id="consumer-example"></a><a href="#consumer-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumer example</h2> |
| <pre><code class="hljs css language-java">String topic = <span class="hljs-string">"persistent://public/default/my-topic"</span>; |
| |
| Properties props = <span class="hljs-keyword">new</span> Properties(); |
| <span class="hljs-comment">// Point to a Pulsar service</span> |
| props.put(<span class="hljs-string">"bootstrap.servers"</span>, <span class="hljs-string">"pulsar://localhost:6650"</span>); |
| props.put(<span class="hljs-string">"group.id"</span>, <span class="hljs-string">"my-subscription-name"</span>); |
| props.put(<span class="hljs-string">"enable.auto.commit"</span>, <span class="hljs-string">"false"</span>); |
| props.put(<span class="hljs-string">"key.deserializer"</span>, IntegerDeserializer<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>())</span>; |
| props.put(<span class="hljs-string">"value.deserializer"</span>, StringDeserializer<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>())</span>; |
| |
| Consumer<Integer, String> consumer = <span class="hljs-keyword">new</span> KafkaConsumer<>(props); |
| consumer.subscribe(Arrays.asList(topic)); |
| |
| <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) { |
| ConsumerRecords<Integer, String> records = consumer.poll(<span class="hljs-number">100</span>); |
| records.forEach(record -> { |
| log.info(<span class="hljs-string">"Received record: {}"</span>, record); |
| }); |
| |
| <span class="hljs-comment">// Commit last offset</span> |
| consumer.commitSync(); |
| } |
| </code></pre> |
| <h2><a class="anchor" aria-hidden="true" id="complete-examples"></a><a href="#complete-examples" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Complete Examples</h2> |
| <p>You can find the complete producer and consumer examples |
| <a href="https://github.com/apache/pulsar/tree/master/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples">here</a>.</p> |
| <h2><a class="anchor" aria-hidden="true" id="compatibility-matrix"></a><a href="#compatibility-matrix" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Compatibility matrix</h2> |
| <p>Currently the Pulsar Kafka wrapper supports most of the operations offered by the Kafka API.</p> |
| <h4><a class="anchor" aria-hidden="true" id="producer"></a><a href="#producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producer</h4> |
| <p>APIs:</p> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Producer Method</th><th style="text-align:left">Supported</th><th style="text-align:left">Notes</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>Future<RecordMetadata> send(ProducerRecord<K, V> record)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void flush()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>List<PartitionInfo> partitionsFor(String topic)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>Map<MetricName, ? extends Metric> metrics()</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void close()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void close(long timeout, TimeUnit unit)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| </tbody> |
| </table> |
| <p>Properties:</p> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Config property</th><th style="text-align:left">Supported</th><th style="text-align:left">Notes</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>acks</code></td><td style="text-align:left">Ignored</td><td style="text-align:left">Durability and quorum writes are configured at the namespace level</td></tr> |
| <tr><td style="text-align:left"><code>auto.offset.reset</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Will have a default value of <code>latest</code> if user does not give specific setting.</td></tr> |
| <tr><td style="text-align:left"><code>batch.size</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>block.on.buffer.full</code></td><td style="text-align:left">Yes</td><td style="text-align:left">If true it will block producer, otherwise an error is returned.</td></tr> |
| <tr><td style="text-align:left"><code>bootstrap.servers</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>buffer.memory</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>client.id</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>compression.type</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Allows <code>gzip</code> and <code>lz4</code>. No <code>snappy</code>.</td></tr> |
| <tr><td style="text-align:left"><code>connections.max.idle.ms</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Only support up to 2,147,483,647,000(Integer.MAX_VALUE * 1000) ms of idle time</td></tr> |
| <tr><td style="text-align:left"><code>interceptor.classes</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>key.serializer</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>linger.ms</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Controls the group commit time when batching messages</td></tr> |
| <tr><td style="text-align:left"><code>max.block.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>max.in.flight.requests.per.connection</code></td><td style="text-align:left">Ignored</td><td style="text-align:left">In Pulsar ordering is maintained even with multiple requests in flight</td></tr> |
| <tr><td style="text-align:left"><code>max.request.size</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>metric.reporters</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>metrics.num.samples</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>metrics.sample.window.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>partitioner.class</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>receive.buffer.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>reconnect.backoff.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>request.timeout.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>retries</code></td><td style="text-align:left">Ignored</td><td style="text-align:left">Pulsar client retries with exponential backoff until the send timeout expires.</td></tr> |
| <tr><td style="text-align:left"><code>send.buffer.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>timeout.ms</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>value.serializer</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| </tbody> |
| </table> |
| <h4><a class="anchor" aria-hidden="true" id="consumer"></a><a href="#consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumer</h4> |
| <p>The following table lists consumer APIs.</p> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Consumer Method</th><th style="text-align:left">Supported</th><th style="text-align:left">Notes</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>Set<TopicPartition> assignment()</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>Set<String> subscription()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void subscribe(Collection<String> topics)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void subscribe(Collection<String> topics, ConsumerRebalanceListener callback)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void assign(Collection<TopicPartition> partitions)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void subscribe(Pattern pattern, ConsumerRebalanceListener callback)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void unsubscribe()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>ConsumerRecords<K, V> poll(long timeoutMillis)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void commitSync()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void commitAsync()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void commitAsync(OffsetCommitCallback callback)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void seek(TopicPartition partition, long offset)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void seekToBeginning(Collection<TopicPartition> partitions)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void seekToEnd(Collection<TopicPartition> partitions)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>long position(TopicPartition partition)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>OffsetAndMetadata committed(TopicPartition partition)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>Map<MetricName, ? extends Metric> metrics()</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>List<PartitionInfo> partitionsFor(String topic)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>Map<String, List<PartitionInfo>> listTopics()</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>Set<TopicPartition> paused()</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void pause(Collection<TopicPartition> partitions)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void resume(Collection<TopicPartition> partitions)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void close()</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void close(long timeout, TimeUnit unit)</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>void wakeup()</code></td><td style="text-align:left">No</td><td style="text-align:left"></td></tr> |
| </tbody> |
| </table> |
| <p>Properties:</p> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Config property</th><th style="text-align:left">Supported</th><th style="text-align:left">Notes</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>group.id</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Maps to a Pulsar subscription name</td></tr> |
| <tr><td style="text-align:left"><code>max.poll.records</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>max.poll.interval.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left">Messages are "pushed" from broker</td></tr> |
| <tr><td style="text-align:left"><code>session.timeout.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>heartbeat.interval.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>bootstrap.servers</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Needs to point to a single Pulsar service URL</td></tr> |
| <tr><td style="text-align:left"><code>enable.auto.commit</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>auto.commit.interval.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left">With auto-commit, acks are sent immediately to broker</td></tr> |
| <tr><td style="text-align:left"><code>partition.assignment.strategy</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>auto.offset.reset</code></td><td style="text-align:left">Yes</td><td style="text-align:left">Only support earliest and latest.</td></tr> |
| <tr><td style="text-align:left"><code>fetch.min.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>fetch.max.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>fetch.max.wait.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>interceptor.classes</code></td><td style="text-align:left">Yes</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>metadata.max.age.ms</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>max.partition.fetch.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>send.buffer.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>receive.buffer.bytes</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| <tr><td style="text-align:left"><code>client.id</code></td><td style="text-align:left">Ignored</td><td style="text-align:left"></td></tr> |
| </tbody> |
| </table> |
| <h2><a class="anchor" aria-hidden="true" id="customize-pulsar-configurations"></a><a href="#customize-pulsar-configurations" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Customize Pulsar configurations</h2> |
| <p>You can configure Pulsar authentication provider directly from the Kafka properties.</p> |
| <h3><a class="anchor" aria-hidden="true" id="pulsar-client-properties"></a><a href="#pulsar-client-properties" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Pulsar client properties</h3> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Config property</th><th style="text-align:left">Default</th><th style="text-align:left">Notes</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setAuthentication-org.apache.pulsar.client.api.Authentication-"><code>pulsar.authentication.class</code></a></td><td style="text-align:left"></td><td style="text-align:left">Configure to auth provider. For example, <code>org.apache.pulsar.client.impl.auth.AuthenticationTls</code>.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setAuthentication-java.lang.String-java.util.Map-"><code>pulsar.authentication.params.map</code></a></td><td style="text-align:left"></td><td style="text-align:left">Map which represents parameters for the Authentication-Plugin.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setAuthentication-java.lang.String-java.lang.String-"><code>pulsar.authentication.params.string</code></a></td><td style="text-align:left"></td><td style="text-align:left">String which represents parameters for the Authentication-Plugin, for example, <code>key1:val1,key2:val2</code>.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTls-boolean-"><code>pulsar.use.tls</code></a></td><td style="text-align:left"><code>false</code></td><td style="text-align:left">Enable TLS transport encryption.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsTrustCertsFilePath-java.lang.String-"><code>pulsar.tls.trust.certs.file.path</code></a></td><td style="text-align:left"></td><td style="text-align:left">Path for the TLS trust certificate store.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsAllowInsecureConnection-boolean-"><code>pulsar.tls.allow.insecure.connection</code></a></td><td style="text-align:left"><code>false</code></td><td style="text-align:left">Accept self-signed certificates from brokers.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setOperationTimeout-int-java.util.concurrent.TimeUnit-"><code>pulsar.operation.timeout.ms</code></a></td><td style="text-align:left"><code>30000</code></td><td style="text-align:left">General operations timeout.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setStatsInterval-long-java.util.concurrent.TimeUnit-"><code>pulsar.stats.interval.seconds</code></a></td><td style="text-align:left"><code>60</code></td><td style="text-align:left">Pulsar client lib stats printing interval.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setIoThreads-int-"><code>pulsar.num.io.threads</code></a></td><td style="text-align:left"><code>1</code></td><td style="text-align:left">The number of Netty IO threads to use.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setConnectionsPerBroker-int-"><code>pulsar.connections.per.broker</code></a></td><td style="text-align:left"><code>1</code></td><td style="text-align:left">The maximum number of connection to each broker.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTcpNoDelay-boolean-"><code>pulsar.use.tcp.nodelay</code></a></td><td style="text-align:left"><code>true</code></td><td style="text-align:left">TCP no-delay.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setConcurrentLookupRequest-int-"><code>pulsar.concurrent.lookup.requests</code></a></td><td style="text-align:left"><code>50000</code></td><td style="text-align:left">The maximum number of concurrent topic lookups.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ClientConfiguration.html#setMaxNumberOfRejectedRequestPerConnection-int-"><code>pulsar.max.number.rejected.request.per.connection</code></a></td><td style="text-align:left"><code>50</code></td><td style="text-align:left">The threshold of errors to forcefully close a connection.</td></tr> |
| </tbody> |
| </table> |
| <h3><a class="anchor" aria-hidden="true" id="pulsar-producer-properties"></a><a href="#pulsar-producer-properties" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Pulsar producer properties</h3> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Config property</th><th style="text-align:left">Default</th><th style="text-align:left">Notes</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerConfiguration.html#setProducerName-java.lang.String-"><code>pulsar.producer.name</code></a></td><td style="text-align:left"></td><td style="text-align:left">Specify the producer name.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerConfiguration.html#setInitialSequenceId-long-"><code>pulsar.producer.initial.sequence.id</code></a></td><td style="text-align:left"></td><td style="text-align:left">Specify baseline for sequence ID of this producer.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessages-int-"><code>pulsar.producer.max.pending.messages</code></a></td><td style="text-align:left"><code>1000</code></td><td style="text-align:left">Set the maximum size of the message queue pending to receive an acknowledgment from the broker.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessagesAcrossPartitions-int-"><code>pulsar.producer.max.pending.messages.across.partitions</code></a></td><td style="text-align:left"><code>50000</code></td><td style="text-align:left">Set the maximum number of pending messages across all the partitions.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingEnabled-boolean-"><code>pulsar.producer.batching.enabled</code></a></td><td style="text-align:left"><code>true</code></td><td style="text-align:left">Control whether automatic batching of messages is enabled for the producer.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingMaxMessages-int-"><code>pulsar.producer.batching.max.messages</code></a></td><td style="text-align:left"><code>1000</code></td><td style="text-align:left">The maximum number of messages in a batch.</td></tr> |
| </tbody> |
| </table> |
| <h3><a class="anchor" aria-hidden="true" id="pulsar-consumer-properties"></a><a href="#pulsar-consumer-properties" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Pulsar consumer Properties</h3> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Config property</th><th style="text-align:left">Default</th><th style="text-align:left">Notes</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ConsumerConfiguration.html#setConsumerName-java.lang.String-"><code>pulsar.consumer.name</code></a></td><td style="text-align:left"></td><td style="text-align:left">Specify the consumer name.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ConsumerConfiguration.html#setReceiverQueueSize-int-"><code>pulsar.consumer.receiver.queue.size</code></a></td><td style="text-align:left">1000</td><td style="text-align:left">Set the size of the consumer receiver queue.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ConsumerBuilder.html#acknowledgmentGroupTime-long-java.util.concurrent.TimeUnit-"><code>pulsar.consumer.acknowledgments.group.time.millis</code></a></td><td style="text-align:left">100</td><td style="text-align:left">Set the maximum amount of group time for consumers to send the acknowledgments to the broker.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ConsumerConfiguration.html#setMaxTotalReceiverQueueSizeAcrossPartitions-int-"><code>pulsar.consumer.total.receiver.queue.size.across.partitions</code></a></td><td style="text-align:left">50000</td><td style="text-align:left">Set the maximum size of the total receiver queue across partitions.</td></tr> |
| <tr><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ConsumerBuilder.html#subscriptionTopicsMode-Mode-"><code>pulsar.consumer.subscription.topics.mode</code></a></td><td style="text-align:left">PersistentOnly</td><td style="text-align:left">Set the subscription topic mode for consumers.</td></tr> |
| </tbody> |
| </table> |
| </span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.3.1/admin-api-schemas"><span class="arrow-prev">← </span><span>Schemas</span></a><a class="docs-next button" href="/docs/en/2.3.1/adaptors-spark"><span>Apache Spark</span><span class="arrow-next"> →</span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#using-the-pulsar-kafka-compatibility-wrapper">Using the Pulsar Kafka compatibility wrapper</a></li><li><a href="#using-the-pulsar-kafka-compatibility-wrapper-together-with-existing-kafka-client">Using the Pulsar Kafka compatibility wrapper together with existing kafka client</a></li><li><a href="#producer-example">Producer example</a></li><li><a href="#consumer-example">Consumer example</a></li><li><a href="#complete-examples">Complete Examples</a></li><li><a href="#compatibility-matrix">Compatibility matrix</a></li><li><a href="#customize-pulsar-configurations">Customize Pulsar configurations</a><ul class="toc-headings"><li><a href="#pulsar-client-properties">Pulsar client properties</a></li><li><a href="#pulsar-producer-properties">Pulsar producer properties</a></li><li><a href="#pulsar-consumer-properties">Pulsar consumer Properties</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script> |
| const community = document.querySelector("a[href='#community']").parentNode; |
| const communityMenu = |
| '<li>' + |
| '<a id="community-menu" href="#">Community <span style="font-size: 0.75em"> ▼</span></a>' + |
| '<div id="community-dropdown" class="hide">' + |
| '<ul id="community-dropdown-items">' + |
| '<li><a href="/en/contact">Contact</a></li>' + |
| '<li><a href="/en/contributing">Contributing</a></li>' + |
| '<li><a href="/en/coding-guide">Coding guide</a></li>' + |
| '<li><a href="/en/events">Events</a></li>' + |
| '<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter ❐</a></li>' + |
| '<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki ❐</a></li>' + |
| '<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking ❐</a></li>' + |
| '<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit ❐</a></li>' + |
| '<li> </li>' + |
| '<li><a href="/en/resources">Resources</a></li>' + |
| '<li><a href="/en/team">Team</a></li>' + |
| '<li><a href="/en/powered-by">Powered By</a></li>' + |
| '</ul>' + |
| '</div>' + |
| '</li>'; |
| |
| community.innerHTML = communityMenu; |
| |
| const communityMenuItem = document.getElementById("community-menu"); |
| const communityDropDown = document.getElementById("community-dropdown"); |
| communityMenuItem.addEventListener("click", function(event) { |
| event.preventDefault(); |
| |
| if (communityDropDown.className == 'hide') { |
| communityDropDown.className = 'visible'; |
| } else { |
| communityDropDown.className = 'hide'; |
| } |
| }); |
| </script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html> |