blob: 67e4d429334a824902aff3e57fa4157091cf1dc7 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Pulsar Java client · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="You can use a Pulsar Java client to create the Java [producer](#producer), [consumer](#consumer), [reader](#reader) and [TableView](#tableview) of messages and to perform [administrative tasks](/docs/en/next/admin-api-overview). The current Java client version is **2.10.0**."/><meta name="docsearch:version" content="next"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Pulsar Java client · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="You can use a Pulsar Java client to create the Java [producer](#producer), [consumer](#consumer), [reader](#reader) and [TableView](#tableview) of messages and to perform [administrative tasks](/docs/en/next/admin-api-overview). The current Java client version is **2.10.0**."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>next</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/next/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/next/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/next/client-libraries-java">日本語</a></li><li><a href="/docs/fr/next/client-libraries-java">Français</a></li><li><a href="/docs/ko/next/client-libraries-java">한국어</a></li><li><a href="/docs/zh-CN/next/client-libraries-java">中文</a></li><li><a href="/docs/zh-TW/next/client-libraries-java">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Client Libraries</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/getting-started-helm">Run Pulsar in Kubernetes</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-proxy-sni-routing">Proxy support with SNI routing</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-multiple-advertised-listeners">Multiple advertised listeners</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-package">How-to: Package</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-connectors">Built-in connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-cdc">CDC connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-develop">Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-cli">CLI</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/sql-getting-started">Query data</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/sql-deployment-configurations">Configuration and deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/sql-rest-api">REST APIs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Tiered Storage</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/tiered-storage-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/tiered-storage-aws">AWS S3 offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/tiered-storage-gcs">GCS offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/tiered-storage-filesystem">Filesystem offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/tiered-storage-azure">Azure BlobStore offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/tiered-storage-aliyun">Aliyun OSS offloader</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Transactions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/txn-why">Why transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/txn-what">What are transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/txn-how">How transactions work?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/txn-use">How to use transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/txn-monitor">How to monitor transactions?</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Kubernetes (Helm)</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/helm-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/helm-prepare">Prepare</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/helm-install">Install</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/helm-deploy">Deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/helm-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/helm-tools">Required Tools</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/deploy-docker">Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/deploy-monitoring">Monitor</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-pulsar-manager">Pulsar Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-isolation">Pulsar isolation</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-policy-and-supported-versions">Security Policy and Supported Versions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-tls-keystore">Using TLS with KeyStore configure</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-oauth2">Authentication using OAuth 2.0 access tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-extending">Extend Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-bouncy-castle">Bouncy Castle Providers</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Performance</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/performance-pulsar-perf">Pulsar Perf</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries">Overview</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/next/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-node">Node.js</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-websocket">WebSocket</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-dotnet">C#</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-rest">REST</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-topics">Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-functions">Functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-packages">Packages</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/develop-plugin">Plugin</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/client-libraries-java.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Pulsar Java client</h1></header><article><div><span><p>You can use a Pulsar Java client to create the Java <a href="#producer">producer</a>, <a href="#consumer">consumer</a>, <a href="#reader">reader</a> and <a href="#tableview">TableView</a> of messages and to perform <a href="/docs/en/next/admin-api-overview">administrative tasks</a>. The current Java client version is <strong>2.10.0</strong>.</p>
<p>All the methods in <a href="#producer">producer</a>, <a href="#consumer">consumer</a>, <a href="#reader">reader</a> and <a href="#tableview">TableView</a> of a Java client are thread-safe.</p>
<p>Javadoc for the Pulsar client is divided into two domains by package as follows.</p>
<table>
<thead>
<tr><th style="text-align:left">Package</th><th style="text-align:left">Description</th><th style="text-align:left">Maven Artifact</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="/api/client/2.10.0-SNAPSHOT"><code>org.apache.pulsar.client.api</code></a></td><td style="text-align:left"><a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/">The producer and consumer API</a></td><td style="text-align:left"><a href="http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client%7C2.10.0%7Cjar">org.apache.pulsar:pulsar-client:2.10.0</a></td></tr>
<tr><td style="text-align:left"><a href="/api/admin/2.10.0-SNAPSHOT"><code>org.apache.pulsar.client.admin</code></a></td><td style="text-align:left">The Java <a href="/docs/en/next/admin-api-overview">admin API</a></td><td style="text-align:left"><a href="http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client-admin%7C2.10.0%7Cjar">org.apache.pulsar:pulsar-client-admin:2.10.0</a></td></tr>
<tr><td style="text-align:left"><code>org.apache.pulsar.client.all</code></td><td style="text-align:left">Include both <code>pulsar-client</code> and <code>pulsar-client-admin</code><br /> Both <code>pulsar-client</code> and <code>pulsar-client-admin</code> are shaded packages and they shade dependencies independently. Consequently, the applications using both <code>pulsar-client</code> and <code>pulsar-client-admin</code> have redundant shaded classes. It would be troublesome if you introduce new dependencies but forget to update shading rules. <br /> In this case, you can use <code>pulsar-client-all</code>, which shades dependencies only one time and reduces the size of dependencies.</td><td style="text-align:left"><a href="http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client-all%7C2.10.0%7Cjar">org.apache.pulsar:pulsar-client-all:2.10.0</a></td></tr>
</tbody>
</table>
<p>This document focuses only on the client API for producing and consuming messages on Pulsar topics. For how to use the Java admin client, see <a href="/docs/en/next/admin-api-overview">Pulsar admin interface</a>.</p>
<h2><a class="anchor" aria-hidden="true" id="installation"></a><a href="#installation" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Installation</h2>
<p>The latest version of the Pulsar Java client library is available via <a href="http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client%7C2.10.0%7Cjar">Maven Central</a>. To use the latest version, add the <code>pulsar-client</code> library to your build configuration.</p>
<blockquote>
<p><strong>Tip</strong></p>
<ul>
<li><p><a href="https://search.maven.org/artifact/org.apache.pulsar/pulsar-client"><code>pulsar-client</code></a> and <a href="https://search.maven.org/artifact/org.apache.pulsar/pulsar-client-admin"><code>pulsar-client-admin</code></a> shade dependencies via <a href="https://maven.apache.org/plugins/maven-shade-plugin/">maven-shade-plugin</a> to avoid conflicts of the underlying dependency packages (such as Netty). If you do not want to manage dependency conflicts manually, you can use them.</p></li>
<li><p><a href="https://search.maven.org/artifact/org.apache.pulsar/pulsar-client-original"><code>pulsar-client-original</code></a> and <a href="https://search.maven.org/artifact/org.apache.pulsar/pulsar-client-admin-original"><code>pulsar-client-admin-original</code></a> <strong>does not</strong> shade dependencies. If you want to manage dependencies manually, you can use them.</p></li>
</ul>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="maven"></a><a href="#maven" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Maven</h3>
<p>If you use Maven, add the following information to the <code>pom.xml</code> file.</p>
<pre><code class="hljs css language-xml"><span class="hljs-comment">&lt;!-- in your &lt;properties&gt; block --&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">pulsar.version</span>&gt;</span>2.10.0<span class="hljs-tag">&lt;/<span class="hljs-name">pulsar.version</span>&gt;</span>
<span class="hljs-comment">&lt;!-- in your &lt;dependencies&gt; block --&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.pulsar<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>pulsar-client<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>${pulsar.version}<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="gradle"></a><a href="#gradle" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Gradle</h3>
<p>If you use Gradle, add the following information to the <code>build.gradle</code> file.</p>
<pre><code class="hljs css language-groovy"><span class="hljs-keyword">def</span> pulsarVersion = <span class="hljs-string">'2.10.0'</span>
dependencies {
compile <span class="hljs-string">group:</span> <span class="hljs-string">'org.apache.pulsar'</span>, <span class="hljs-string">name:</span> <span class="hljs-string">'pulsar-client'</span>, <span class="hljs-string">version:</span> pulsarVersion
}
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="connection-urls"></a><a href="#connection-urls" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Connection URLs</h2>
<p>To connect to Pulsar using client libraries, you need to specify a <a href="/docs/en/next/developing-binary-protocol">Pulsar protocol</a> URL.</p>
<p>You can assign Pulsar protocol URLs to specific clusters and use the <code>pulsar</code> scheme. The default port is <code>6650</code>. The following is an example of <code>localhost</code>.</p>
<pre><code class="hljs css language-http">pulsar://localhost:6650
</code></pre>
<p>If you have multiple brokers, the URL is as follows.</p>
<pre><code class="hljs css language-http">pulsar://localhost:6550,localhost:6651,localhost:6652
</code></pre>
<p>A URL for a production Pulsar cluster is as follows.</p>
<pre><code class="hljs css language-http">pulsar://pulsar.us-west.example.com:6650
</code></pre>
<p>If you use <a href="/docs/en/next/security-tls-authentication">TLS</a> authentication, the URL is as follows.</p>
<pre><code class="hljs css language-http">pulsar+ssl://pulsar.us-west.example.com:6651
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="client"></a><a href="#client" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Client</h2>
<p>You can instantiate a <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
object using just a URL for the target Pulsar <a href="/docs/en/next/reference-terminology#cluster">cluster</a> like this:</p>
<pre><code class="hljs css language-java">PulsarClient client = PulsarClient.builder()
.serviceUrl(<span class="hljs-string">"pulsar://localhost:6650"</span>)
.build();
</code></pre>
<p>If you have multiple brokers, you can initiate a PulsarClient like this:</p>
<pre><code class="hljs css language-java">PulsarClient client = PulsarClient.builder()
.serviceUrl(<span class="hljs-string">"pulsar://localhost:6650,localhost:6651,localhost:6652"</span>)
.build();
</code></pre>
<blockquote>
<h3><a class="anchor" aria-hidden="true" id="default-broker-urls-for-standalone-clusters"></a><a href="#default-broker-urls-for-standalone-clusters" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Default broker URLs for standalone clusters</h3>
<p>If you run a cluster in <a href="/docs/en/next/getting-started-standalone">standalone mode</a>, the broker is available at the <code>pulsar://localhost:6650</code> URL by default.</p>
</blockquote>
<p>If you create a client, you can use the <code>loadConf</code> configuration. The following parameters are available in <code>loadConf</code>.</p>
<table>
<thead>
<tr><th>Name</th><th>Type</th><th><div style="width:260px">Description</div></th><th>Default</th></tr>
</thead>
<tbody>
<tr><td><code>serviceUrl</code></td><td>String</td><td>Service URL provider for Pulsar service</td><td>None</td></tr>
<tr><td><code>authPluginClassName</code></td><td>String</td><td>Name of the authentication plugin</td><td>None</td></tr>
<tr><td><code>authParams</code></td><td>String</td><td>Parameters for the authentication plugin <br /><br /><strong>Example</strong><br /> key1:val1,key2:val2</td><td>None</td></tr>
<tr><td><code>operationTimeoutMs</code></td><td>long</td><td><code>operationTimeoutMs</code></td><td>Operation timeout</td><td>30000</td></tr>
<tr><td><code>statsIntervalSeconds</code></td><td>long</td><td>Interval between each stats information<br /><br />Stats is activated with positive <code>statsInterval</code><br /><br />Set <code>statsIntervalSeconds</code> to 1 second at least.</td><td>60</td></tr>
<tr><td><code>numIoThreads</code></td><td>int</td><td>The number of threads used for handling connections to brokers</td><td>1</td></tr>
<tr><td><code>numListenerThreads</code></td><td>int</td><td>The number of threads used for handling message listeners. The listener thread pool is shared across all the consumers and readers using the &quot;listener&quot; model to get messages. For a given consumer, the listener is always invoked from the same thread to ensure ordering. If you want multiple threads to process a single topic, you need to create a <a href="https://pulsar.apache.org/docs/en/next/concepts-messaging/#shared"><code>shared</code></a> subscription and multiple consumers for this subscription. This does not ensure ordering.</td><td>1</td></tr>
<tr><td><code>useTcpNoDelay</code></td><td>boolean</td><td>Whether to use TCP no-delay flag on the connection to disable Nagle algorithm</td><td>true</td></tr>
<tr><td><code>useTls</code></td><td>boolean</td><td>Whether to use TLS encryption on the connection</td><td>false</td></tr>
<tr><td><code>tlsTrustCertsFilePath</code></td><td>string</td><td>Path to the trusted TLS certificate file</td><td>None</td></tr>
<tr><td><code>tlsAllowInsecureConnection</code></td><td>boolean</td><td>Whether the Pulsar client accepts untrusted TLS certificate from broker</td><td>false</td></tr>
<tr><td><code>tlsHostnameVerificationEnable</code></td><td>boolean</td><td>Whether to enable TLS hostname verification</td><td>false</td></tr>
<tr><td><code>concurrentLookupRequest</code></td><td>int</td><td>The number of concurrent lookup requests allowed to send on each broker connection to prevent overload on broker</td><td>5000</td></tr>
<tr><td><code>maxLookupRequest</code></td><td>int</td><td>The maximum number of lookup requests allowed on each broker connection to prevent overload on broker</td><td>50000</td></tr>
<tr><td><code>maxNumberOfRejectedRequestPerConnection</code></td><td>int</td><td>The maximum number of rejected requests of a broker in a certain time frame (30 seconds) after the current connection is closed and the client creates a new connection to connect to a different broker</td><td>50</td></tr>
<tr><td><code>keepAliveIntervalSeconds</code></td><td>int</td><td>Seconds of keeping alive interval for each client broker connection</td><td>30</td></tr>
<tr><td><code>connectionTimeoutMs</code></td><td>int</td><td>Duration of waiting for a connection to a broker to be established <br /><br />If the duration passes without a response from a broker, the connection attempt is dropped</td><td>10000</td></tr>
<tr><td><code>requestTimeoutMs</code></td><td>int</td><td>Maximum duration for completing a request</td><td>60000</td></tr>
<tr><td><code>defaultBackoffIntervalNanos</code></td><td>int</td><td>Default duration for a backoff interval</td><td>TimeUnit.MILLISECONDS.toNanos(100);</td></tr>
<tr><td><code>maxBackoffIntervalNanos</code></td><td>long</td><td>Maximum duration for a backoff interval</td><td>TimeUnit.SECONDS.toNanos(30)</td></tr>
<tr><td><code>socks5ProxyAddress</code></td><td>SocketAddress</td><td>SOCKS5 proxy address</td><td>None</td></tr>
<tr><td><code>socks5ProxyUsername</code></td><td>string</td><td>SOCKS5 proxy username</td><td>None</td></tr>
<tr><td><code>socks5ProxyPassword</code></td><td>string</td><td>SOCKS5 proxy password</td><td>None</td></tr>
</tbody>
</table>
<p>Check out the Javadoc for the <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
class for a full list of configurable parameters.</p>
<blockquote>
<p>In addition to client-level configuration, you can also apply <a href="#configure-producer">producer</a> and <a href="#configure-consumer">consumer</a> specific configuration as described in sections below.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="client-memory-allocator-configuration"></a><a href="#client-memory-allocator-configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Client memory allocator configuration</h3>
<p>You can set the client memory allocator configurations through Java properties.<br/></p>
<table>
<thead>
<tr><th>Property</th><th>Type</th><th><div>Description</div></th><th>Default</th><th>Available values</th></tr>
</thead>
<tbody>
<tr><td><code>pulsar.allocator.pooled</code></td><td>String</td><td>If set to <code>true</code>, the client uses a direct memory pool. </br> If set to <code>false</code>, the client uses a heap memory without pool</td><td>true</td><td><li> true </li> <li> false </li></td></tr>
<tr><td><code>pulsar.allocator.exit_on_oom</code></td><td>String</td><td>Whether to exit the JVM when OOM happens</td><td>false</td><td><li> true </li> <li> false </li></td></tr>
<tr><td><code>pulsar.allocator.leak_detection</code></td><td>String</td><td>Service URL provider for Pulsar service</td><td>Disabled</td><td><li> Disabled </li> <li> Simple </li> <li> Advanced </li> <li> Paranoid </li></td></tr>
<tr><td><code>pulsar.allocator.out_of_memory_policy</code></td><td>String</td><td>When an OOM occurs, the client throws an exception or fallbacks to heap</td><td>FallbackToHeap</td><td><li> ThrowException </li> <li> FallbackToHeap </li></td></tr>
</tbody>
</table>
<p><strong>Example</strong>:</p>
<pre><code class="hljs"><span class="hljs-attr">-Dpulsar.allocator.pooled</span>=<span class="hljs-literal">true</span>
<span class="hljs-attr">-Dpulsar.allocator.exit_on_oom</span>=<span class="hljs-literal">false</span>
<span class="hljs-attr">-Dpulsar.allocator.leak_detection</span>=Disabled
<span class="hljs-attr">-Dpulsar.allocator.out_of_memory_policy</span>=ThrowException
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="cluster-level-failover"></a><a href="#cluster-level-failover" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Cluster-level failover</h3>
<p>This chapter describes the concept, benefits, use cases, constraints, usage, working principles, and more information about the cluster-level failover. It contains the following sections:</p>
<ul>
<li><p><a href="#what-is-cluster-level-failover">What is cluster-level failover?</a></p>
<ul>
<li><p><a href="#concept-of-cluster-level-failover">Concept of cluster-level failover</a></p></li>
<li><p><a href="#why-use-cluster-level-failover">Why use cluster-level failover?</a></p></li>
<li><p><a href="#when-to-use-cluster-level-failover">When to use cluster-level failover?</a></p></li>
<li><p><a href="#when-cluster-level-failover-is-triggered">When cluster-level failover is triggered?</a></p></li>
<li><p><a href="#why-does-cluster-level-failover-fail">Why does cluster-level failover fail?</a></p></li>
<li><p><a href="#what-are-the-limitations-of-cluster-level-failover">What are the limitations of cluster-level failover?</a></p></li>
<li><p><a href="#what-are-the-relationships-between-cluster-level-failover-and-geo-replication">What are the relationships between cluster-level failover and geo-replication?</a></p></li>
</ul></li>
<li><p><a href="#how-to-use-cluster-level-failover">How to use cluster-level failover?</a></p></li>
<li><p><a href="#how-does-cluster-level-failover-work">How does cluster-level failover work?</a></p></li>
</ul>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="what-is-cluster-level-failover"></a><a href="#what-is-cluster-level-failover" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>What is cluster-level failover</h4>
</blockquote>
<p>This chapter helps you better understand the concept of cluster-level failover.</p>
<blockquote>
<h5><a class="anchor" aria-hidden="true" id="concept-of-cluster-level-failover"></a><a href="#concept-of-cluster-level-failover" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Concept of cluster-level failover</h5>
</blockquote>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-495-tab-496" class="nav-link active" data-group="group_495" data-tab="tab-group-495-content-496">Automatic cluster-level failover</div><div id="tab-group-495-tab-497" class="nav-link" data-group="group_495" data-tab="tab-group-495-content-497">Controlled cluster-level failover</div></div><div class="tab-content"><div id="tab-group-495-content-496" class="tab-pane active" data-group="group_495" tabindex="-1"><div><span><p>Automatic cluster-level failover supports Pulsar clients switching from a primary cluster to one or several backup clusters automatically and seamlessly when it detects a failover event based on the configured detecting policy set by <strong>users</strong>.</p>
<p><img src="/docs/assets/cluster-level-failover-1.png" alt="Automatic cluster-level failover"></p>
</span></div></div><div id="tab-group-495-content-497" class="tab-pane" data-group="group_495" tabindex="-1"><div><span><p>Controlled cluster-level failover supports Pulsar clients switching from a primary cluster to one or several backup clusters. The switchover is manually set by <strong>administrators</strong>.</p>
<p><img src="/docs/assets/cluster-level-failover-2.png" alt="Controlled cluster-level failover"></p>
</span></div></div></div></div>
<p>Once the primary cluster functions again, Pulsar clients can switch back to the primary cluster. Most of the time users won’t even notice a thing. Users can keep using applications and services without interruptions or timeouts.</p>
<blockquote>
<h5><a class="anchor" aria-hidden="true" id="why-use-cluster-level-failover"></a><a href="#why-use-cluster-level-failover" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Why use cluster-level failover?</h5>
</blockquote>
<p>The cluster-level failover provides fault tolerance, continuous availability, and high availability together. It brings a number of benefits, including but not limited to:</p>
<ul>
<li><p>Reduced cost: services can be switched and recovered automatically with no data loss.</p></li>
<li><p>Simplified management: businesses can operate on an “always-on” basis since no immediate user intervention is required.</p></li>
<li><p>Improved stability and robustness: it ensures continuous performance and minimizes service downtime.</p></li>
</ul>
<blockquote>
<h5><a class="anchor" aria-hidden="true" id="when-to-use-cluster-level-failover"></a><a href="#when-to-use-cluster-level-failover" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>When to use cluster-level failover?</h5>
</blockquote>
<p>The cluster-level failover protects your environment in a number of ways, including but not limited to:</p>
<ul>
<li><p>Disaster recovery: cluster-level failover can automatically and seamlessly transfer the production workload on a primary cluster to one or several backup clusters, which ensures minimum data loss and reduced recovery time.</p></li>
<li><p>Planned migration: if you want to migrate production workloads from an old cluster to a new cluster, you can improve the migration efficiency with cluster-level failover. For example, you can test whether the data migration goes smoothly in case of a failover event, identify possible issues and risks before the migration.</p></li>
</ul>
<blockquote>
<h5><a class="anchor" aria-hidden="true" id="when-cluster-level-failover-is-triggered"></a><a href="#when-cluster-level-failover-is-triggered" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>When cluster-level failover is triggered?</h5>
</blockquote>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-498-tab-499" class="nav-link active" data-group="group_498" data-tab="tab-group-498-content-499">Automatic cluster-level failover</div><div id="tab-group-498-tab-500" class="nav-link" data-group="group_498" data-tab="tab-group-498-content-500">Controlled cluster-level failover</div></div><div class="tab-content"><div id="tab-group-498-content-499" class="tab-pane active" data-group="group_498" tabindex="-1"><div><span><p>Automatic cluster-level failover is triggered when Pulsar clients cannot connect to the primary cluster for a prolonged period of time. This can be caused by any number of reasons including, but not limited to:</p>
<ul>
<li><p>Network failure: internet connection is lost.</p></li>
<li><p>Power failure: shutdown time of a primary cluster exceeds time limits.</p></li>
<li><p>Service error: errors occur on a primary cluster (for example, the primary cluster does not function because of time limits).</p></li>
<li><p>Crashed storage space: the primary cluster does not have enough storage space, but the corresponding storage space on the backup server functions normally.</p></li>
</ul>
</span></div></div><div id="tab-group-498-content-500" class="tab-pane" data-group="group_498" tabindex="-1"><div><span><p>Controlled cluster-level failover is triggered when administrators set the switchover manually.</p>
</span></div></div></div></div>
<blockquote>
<h5><a class="anchor" aria-hidden="true" id="why-does-cluster-level-failover-fail"></a><a href="#why-does-cluster-level-failover-fail" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Why does cluster-level failover fail?</h5>
</blockquote>
<p>Obviously, the cluster-level failover does not succeed if the backup cluster is unreachable by active Pulsar clients. This can happen for many reasons, including but not limited to:</p>
<ul>
<li><p>Power failure: the backup cluster is shut down or does not function normally.</p></li>
<li><p>Crashed storage space: primary and backup clusters do not have enough storage space.</p></li>
<li><p>If the failover is initiated, but no cluster can assume the role of an available cluster due to errors, and the primary cluster is not able to provide service normally.</p></li>
<li><p>If you manually initiate a switchover, but services cannot be switched to the backup cluster server, then the system will attempt to switch services back to the primary cluster.</p></li>
<li><p>Fail to authenticate or authorize between 1) primary and backup clusters, or 2) between two backup clusters.</p></li>
</ul>
<blockquote>
<h5><a class="anchor" aria-hidden="true" id="what-are-the-limitations-of-cluster-level-failover"></a><a href="#what-are-the-limitations-of-cluster-level-failover" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>What are the limitations of cluster-level failover?</h5>
</blockquote>
<p>Currently, cluster-level failover can perform probes to prevent data loss, but it can not check the status of backup clusters. If backup clusters are not healthy, you cannot produce or consume data.</p>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="what-are-the-relationships-between-cluster-level-failover-and-geo-replication"></a><a href="#what-are-the-relationships-between-cluster-level-failover-and-geo-replication" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>What are the relationships between cluster-level failover and geo-replication?</h4>
</blockquote>
<p>The cluster-level failover is an extension of <a href="/docs/en/next/concepts-replication">geo-replication</a> to improve stability and robustness. The cluster-level failover depends on geo-replication, and they have some <strong>differences</strong> as below.</p>
<table>
<thead>
<tr><th>Influence</th><th>Cluster-level failover</th><th>Geo-replication</th></tr>
</thead>
<tbody>
<tr><td>Do administrators have heavy workloads?</td><td>No or maybe.<br /><br />- For the <strong>automatic</strong> cluster-level failover, the cluster switchover is triggered automatically based on the policies set by <strong>users</strong>.<br /><br />- For the <strong>controlled</strong> cluster-level failover, the switchover is triggered manually by <strong>administrators</strong>.</td><td>Yes.<br /><br />If a cluster fails, immediate administration intervention is required.</td></tr>
<tr><td>Result in data loss?</td><td>No.<br /><br />For both <strong>automatic</strong> and <strong>controlled</strong> cluster-level failover, if the failed primary cluster doesn't replicate messages immediately to the backup cluster, the Pulsar client can't consume the non-replicated messages. After the primary cluster is restored and the Pulsar client switches back, the non-replicated data can still be consumed by the Pulsar client. Consequently, the data is not lost.<br /><br />- For the <strong>automatic</strong> cluster-level failover, services can be switched and recovered automatically with no data loss.<br /><br />- For the <strong>controlled</strong> cluster-level failover, services can be switched and recovered manually and data loss may happen.</td><td>Yes.<br /><br />Pulsar clients and DNS systems have caches. When administrators switch the DNS from a primary cluster to a backup cluster, it takes some time for cache trigger timeout, which delays client recovery time and fails to produce or consume messages.</td></tr>
<tr><td>Result in Pulsar client failure?</td><td>No or maybe.<br /><br />- For <strong>automatic</strong> cluster-level failover, services can be switched and recovered automatically and the Pulsar client does not fail. <br /><br />- For <strong>controlled</strong> cluster-level failover, services can be switched and recovered manually, but the Pulsar client fails before administrators can take action.</td><td>Same as above.</td></tr>
</tbody>
</table>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="how-to-use-cluster-level-failover"></a><a href="#how-to-use-cluster-level-failover" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How to use cluster-level failover</h4>
</blockquote>
<p>This section guides you through every step on how to configure cluster-level failover.</p>
<p><strong>Tip</strong></p>
<ul>
<li><p>You should configure cluster-level failover only when the cluster contains sufficient resources to handle all possible consequences. Workload intensity on the backup cluster may increase significantly.</p></li>
<li><p>Connect clusters to an uninterruptible power supply (UPS) unit to reduce the risk of unexpected power loss.</p></li>
</ul>
<p><strong>Requirements</strong></p>
<ul>
<li><p>Pulsar client 2.10 or later versions.</p></li>
<li><p>For backup clusters:</p>
<ul>
<li><p>The number of BooKeeper nodes should be equal to or greater than the ensemble quorum.</p></li>
<li><p>The number of ZooKeeper nodes should be equal to or greater than 3.</p></li>
</ul></li>
<li><p><strong>Turn on geo-replication</strong> between the primary cluster and any dependent cluster (primary to backup or backup to backup) to prevent data loss.</p></li>
<li><p>Set <code>replicateSubscriptionState</code> to <code>true</code> when creating consumers.</p></li>
</ul>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-501-tab-502" class="nav-link active" data-group="group_501" data-tab="tab-group-501-content-502">Automatic cluster-level failover</div><div id="tab-group-501-tab-503" class="nav-link" data-group="group_501" data-tab="tab-group-501-content-503">Controlled cluster-level failover</div></div><div class="tab-content"><div id="tab-group-501-content-502" class="tab-pane active" data-group="group_501" tabindex="-1"><div><span><p>This is an example of how to construct a Java Pulsar client to use automatic cluster-level failover. The switchover is triggered automatically.</p>
<pre><code class="hljs"> private PulsarClient getAutoFailoverClient() throws PulsarClientException {<br /><br /> ServiceUrlProvider failover = AutoClusterFailover<span class="hljs-number">.</span>builder()<br /><span class="hljs-meta"> .primary</span>(<span class="hljs-string">"pulsar://localhost:6650"</span>)<br /><span class="hljs-meta"> .secondary</span>(Collections<span class="hljs-number">.</span>singletonList(<span class="hljs-string">"pulsar://other1:6650"</span>,<span class="hljs-string">"pulsar://other2:6650"</span>))<br /><span class="hljs-meta"> .failoverDelay</span>(<span class="hljs-number">30</span>, TimeUnit<span class="hljs-number">.</span>SECONDS)<br /><span class="hljs-meta"> .switchBackDelay</span>(<span class="hljs-number">60</span>, TimeUnit<span class="hljs-number">.</span>SECONDS)<br /><span class="hljs-meta"> .checkInterval</span>(<span class="hljs-number">1000</span>, TimeUnit<span class="hljs-number">.</span>MILLISECONDS)<br /><span class="hljs-meta"> .secondaryTlsTrustCertsFilePath</span>(<span class="hljs-string">"/path/to/ca.cert.pem"</span>)<br /><span class="hljs-meta"> .secondaryAuthentication</span>(<span class="hljs-string">"org.apache.pulsar.client.impl.auth.AuthenticationTls"</span>,<br /><span class="hljs-string">"tlsCertFile:/path/to/my-role.cert.pem,tlsKeyFile:/path/to/my-role.key-pk8.pem"</span>)<br /><span class="hljs-meta"><br /> .build</span>()<span class="hljs-comment">;</span><br /><br /> PulsarClient pulsarClient = PulsarClient<span class="hljs-number">.</span>builder()<br /><span class="hljs-meta"> .build</span>()<span class="hljs-comment">;</span><br /><br /> failover<span class="hljs-number">.</span>initialize(pulsarClient)<span class="hljs-comment">;</span><br /> return pulsarClient<span class="hljs-comment">;</span><br /> }<br /></code></pre>
<p>Configure the following parameters:</p>
<table>
<thead>
<tr><th>Parameter</th><th>Default value</th><th>Required?</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>primary</code></td><td>N/A</td><td>Yes</td><td>Service URL of the primary cluster.</td></tr>
<tr><td><code>secondary</code></td><td>N/A</td><td>Yes</td><td>Service URL(s) of one or several backup clusters.<br /><br/>You can specify several backup clusters using a comma-separated list.<br /><br/> Note that:<br />- The backup cluster is chosen in the sequence shown in the list. <br />- If all backup clusters are available, the Pulsar client chooses the first backup cluster.</td></tr>
<tr><td><code>failoverDelay</code></td><td>N/A</td><td>Yes</td><td>The delay before the Pulsar client switches from the primary cluster to the backup cluster.<br /><br/>Automatic failover is controlled by a probe task: <br />1) The probe task first checks the health status of the primary cluster. <br /> 2) If the probe task finds the continuous failure time of the primary cluster exceeds <code>failoverDelayMs</code>, it switches the Pulsar client to the backup cluster.</td></tr>
<tr><td><code>switchBackDelay</code></td><td>N/A</td><td>Yes</td><td>The delay before the Pulsar client switches from the backup cluster to the primary cluster.<br /><br/>Automatic failover switchover is controlled by a probe task: <br /> 1) After the Pulsar client switches from the primary cluster to the backup cluster, the probe task continues to check the status of the primary cluster. <br /> 2) If the primary cluster functions well and continuously remains active longer than <code>switchBackDelay</code>, the Pulsar client switches back to the primary cluster.</td></tr>
<tr><td><code>checkInterval</code></td><td>30s</td><td>No</td><td>Frequency of performing a probe task (in seconds).</td></tr>
<tr><td><code>secondaryTlsTrustCertsFilePath</code></td><td>N/A</td><td>No</td><td>Path to the trusted TLS certificate file of the backup cluster.</td></tr>
<tr><td><code>secondaryAuthentication</code></td><td>N/A</td><td>No</td><td>Authentication of the backup cluster.</td></tr>
</tbody>
</table>
</span></div></div><div id="tab-group-501-content-503" class="tab-pane" data-group="group_501" tabindex="-1"><div><span><p>This is an example of how to construct a Java Pulsar client to use controlled cluster-level failover. The switchover is triggered by administrators manually.</p>
<p><strong>Note</strong>: you can have one or several backup clusters but can only specify one.</p>
<pre><code class="hljs"> <span class="hljs-keyword">public</span> PulsarClient getControlledFailoverClient() throws IOException {<br /><span class="hljs-built_in">Map</span>&lt;<span class="hljs-built_in">String</span>, <span class="hljs-built_in">String</span>&gt; <span class="hljs-keyword">header</span> = <span class="hljs-literal">new</span> HashMap&lt;&gt;(); <br /> <span class="hljs-keyword">header</span>.put(“service_user_id”, “my<span class="hljs-params">-user</span>”);<br /> <span class="hljs-keyword">header</span>.put(“service_password”, “tiger”);<br /> <span class="hljs-keyword">header</span>.put(“clusterA”, “tokenA”);<br /> <span class="hljs-keyword">header</span>.put(“clusterB”, “tokenB”);<br /><br /> ServiceUrlProvider provider = <br /> ControlledClusterFailover.builder()<br /> .defaultServiceUrl(<span class="hljs-string">"pulsar://localhost:6650"</span>)<br /> .checkInterval(<span class="hljs-number">1</span>, TimeUnit.MINUTES)<br /> .urlProvider(<span class="hljs-string">"http://localhost:8080/test"</span>)<br /> .urlProviderHeader(<span class="hljs-keyword">header</span>)<br /> .build();<br /><br /> PulsarClient pulsarClient = <br /> PulsarClient.builder()<br /> .build();<br /><br /> provider.initialize(pulsarClient);<br /> <span class="hljs-keyword">return</span> pulsarClient;<br />}<br /><br /></code></pre>
<table>
<thead>
<tr><th>Parameter</th><th>Default value</th><th>Required?</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>defaultServiceUrl</code></td><td>N/A</td><td>Yes</td><td>Pulsar service URL.</td></tr>
<tr><td><code>checkInterval</code></td><td>30s</td><td>No</td><td>Frequency of performing a probe task (in seconds).</td></tr>
<tr><td><code>urlProvider</code></td><td>N/A</td><td>Yes</td><td>URL provider service.</td></tr>
<tr><td><code>urlProviderHeader</code></td><td>N/A</td><td>No</td><td><code>urlProviderHeader</code> is a map containing tokens and credentials. <br /><br />If you enable authentication or authorization between Pulsar clients and primary and backup clusters, you need to provide <code>urlProviderHeader</code>.</td></tr>
</tbody>
</table>
<p>Here is an example of how <code>urlProviderHeader</code> works.</p>
<p><img src="/docs/assets/cluster-level-failover-3.png" alt="How urlProviderHeader works"></p>
<p>Assume that you want to connect Pulsar client 1 to cluster A.</p>
<ol>
<li><p>Pulsar client 1 sends the token <em>t1</em> to the URL provider service.</p></li>
<li><p>The URL provider service returns the credential <em>c1</em> and the cluster A URL to the Pulsar client.</p>
<p>The URL provider service manages all tokens and credentials. It returns different credentials based on different tokens and different target cluster URLs to different Pulsar clients.</p>
<p><strong>Note</strong>: <strong>the credential must be in a JSON file and contain parameters as shown</strong>.</p>
<pre><code class="hljs">{<br />"serviceUrl": "pulsar+ssl://target:6651", <br />"tlsTrustCertsFilePath": "/security/ca.cert.pem",<br />"authPluginClassName":"org.apache.pulsar.client.impl.auth.AuthenticationTls",<br />"authParamsString": " \"tlsCertFile\": \"/<span class="hljs-keyword">security</span>/client.cert.pem\" <br /> \"tlsKeyFile\": \"/<span class="hljs-keyword">security</span>/client-pk8.pem\" "<br />}<br /></code></pre></li>
<li><p>Pulsar client 1 connects to cluster A using credential <em>c1</em>.</p></li>
</ol>
</span></div></div></div></div>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="how-does-cluster-level-failover-work"></a><a href="#how-does-cluster-level-failover-work" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>How does cluster-level failover work?</h4>
</blockquote>
<p>This chapter explains the working process of cluster-level failover. For more implementation details, see <a href="https://github.com/apache/pulsar/issues/13315">PIP-121</a>.</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-504-tab-505" class="nav-link active" data-group="group_504" data-tab="tab-group-504-content-505">Automatic cluster-level failover</div><div id="tab-group-504-tab-506" class="nav-link" data-group="group_504" data-tab="tab-group-504-content-506">Controlled cluster-level failover</div></div><div class="tab-content"><div id="tab-group-504-content-505" class="tab-pane active" data-group="group_504" tabindex="-1"><div><span><p>In an automatic failover cluster, the primary cluster and backup cluster are aware of each other's availability. The automatic failover cluster performs the following actions without administrator intervention:</p>
<ol>
<li><p>The Pulsar client runs a probe task at intervals defined in <code>checkInterval</code>.</p></li>
<li><p>If the probe task finds the failure time of the primary cluster exceeds the time set in the <code>failoverDelay</code> parameter, it searches backup clusters for an available healthy cluster.</p>
<p>2a) If there are healthy backup clusters, the Pulsar client switches to a backup cluster in the order defined in <code>secondary</code>.</p>
<p>2b) If there is no healthy backup cluster, the Pulsar client does not perform the switchover, and the probe task continues to look for an available backup cluster.</p></li>
<li><p>The probe task checks whether the primary cluster functions well or not.</p>
<p>3a) If the primary cluster comes back and the continuous healthy time exceeds the time set in <code>switchBackDelay</code>, the Pulsar client switches back to the primary cluster.</p>
<p>3b) If the primary cluster does not come back, the Pulsar client does not perform the switchover.</p></li>
</ol>
<p><img src="/docs/assets/cluster-level-failover-4.png" alt="Workflow of automatic failover cluster"></p>
</span></div></div><div id="tab-group-504-content-506" class="tab-pane" data-group="group_504" tabindex="-1"><div><span><ol>
<li><p>The Pulsar client runs a probe task at intervals defined in <code>checkInterval</code>.</p></li>
<li><p>The probe task fetches the service URL configuration from the URL provider service, which is configured by <code>urlProvider</code>.</p>
<p>2a) If the service URL configuration is changed, the probe task switches to the target cluster without checking the health status of the target cluster.</p>
<p>2b) If the service URL configuration is not changed, the Pulsar client does not perform the switchover.</p></li>
<li><p>If the Pulsar client switches to the target cluster, the probe task continues to fetch service URL configuration from the URL provider service at intervals defined in <code>checkInterval</code>.</p>
<p>3a) If the service URL configuration is changed, the probe task switches to the target cluster without checking the health status of the target cluster.</p>
<p>3b) If the service URL configuration is not changed, it does not perform the switchover.</p></li>
</ol>
<p><img src="/docs/assets/cluster-level-failover-5.png" alt="Workflow of controlled failover cluster"></p>
</span></div></div></div></div>
<h2><a class="anchor" aria-hidden="true" id="producer"></a><a href="#producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producer</h2>
<p>In Pulsar, producers write messages to topics. Once you've instantiated a <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
object (as in the section <a href="#client-configuration">above</a>), you can create a <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/Producer">Producer</a>
for a specific Pulsar <a href="/docs/en/next/reference-terminology#topic">topic</a>.</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; producer = client.newProducer()
.topic(<span class="hljs-string">"my-topic"</span>)
.create();
<span class="hljs-comment">// You can then send messages to the broker and topic you specified:</span>
producer.send(<span class="hljs-string">"My message"</span>.getBytes());
</code></pre>
<p>By default, producers produce messages that consist of byte arrays. You can produce different types by specifying a message <a href="#schema">schema</a>.</p>
<pre><code class="hljs css language-java">Producer&lt;String&gt; stringProducer = client.newProducer(Schema.STRING)
.topic(<span class="hljs-string">"my-topic"</span>)
.create();
stringProducer.send(<span class="hljs-string">"My message"</span>);
</code></pre>
<blockquote>
<p>Make sure that you close your producers, consumers, and clients when you do not need them.</p>
<pre><code class="hljs css language-java">producer.close();
consumer.close();
client.close();
</code></pre>
<p>Close operations can also be asynchronous:</p>
<pre><code class="hljs css language-java">producer.closeAsync()
.thenRun(() -&gt; System.out.println(<span class="hljs-string">"Producer closed"</span>))
.exceptionally((ex) -&gt; {
System.err.println(<span class="hljs-string">"Failed to close producer: "</span> + ex);
<span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;
});
</code></pre>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="configure-producer"></a><a href="#configure-producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure producer</h3>
<p>If you instantiate a <code>Producer</code> object by specifying only a topic name as the example above, the default configuration of producer is used.</p>
<p>If you create a producer, you can use the <code>loadConf</code> configuration. The following parameters are available in <code>loadConf</code>.</p>
<table>
<thead>
<tr><th>Name</th><th>Type</th><th><div style="width:300px">Description</div></th><th>Default</th></tr>
</thead>
<tbody>
<tr><td><code>topicName</code></td><td>string</td><td>Topic name</td><td>null</td></tr>
<tr><td><code>producerName</code></td><td>string</td><td>Producer name</td><td>null</td></tr>
<tr><td><code>sendTimeoutMs</code></td><td>long</td><td>Message send timeout in ms.<br />If a message is not acknowledged by a server before the <code>sendTimeout</code> expires, an error occurs.</td><td>30000</td></tr>
<tr><td><code>blockIfQueueFull</code></td><td>boolean</td><td>If it is set to <code>true</code>, when the outgoing message queue is full, the <code>Send</code> and <code>SendAsync</code> methods of producer block, rather than failing and throwing errors. <br />If it is set to <code>false</code>, when the outgoing message queue is full, the <code>Send</code> and <code>SendAsync</code> methods of producer fail and <code>ProducerQueueIsFullError</code> exceptions occur.<br /><br />The <code>MaxPendingMessages</code> parameter determines the size of the outgoing message queue.</td><td>false</td></tr>
<tr><td><code>maxPendingMessages</code></td><td>int</td><td>The maximum size of a queue holding pending messages.<br /><br />For example, a message waiting to receive an acknowledgment from a <a href="/docs/en/next/reference-terminology#broker">broker</a>. <br /><br />By default, when the queue is full, all calls to the <code>Send</code> and <code>SendAsync</code> methods fail <strong>unless</strong> you set <code>BlockIfQueueFull</code> to <code>true</code>.</td><td>1000</td></tr>
<tr><td><code>maxPendingMessagesAcrossPartitions</code></td><td>int</td><td>The maximum number of pending messages across partitions. <br /><br />Use the setting to lower the max pending messages for each partition ({@link #setMaxPendingMessages(int)}) if the total number exceeds the configured value.</td><td>50000</td></tr>
<tr><td><code>messageRoutingMode</code></td><td>MessageRoutingMode</td><td>Message routing logic for producers on <a href="/docs/en/next/concepts-architecture-overview#partitioned-topics">partitioned topics</a>.<br /> Apply the logic only when setting no key on messages. <br />Available options are as follows: <br /><li><code>pulsar.RoundRobinDistribution</code>: round robin</li><li><code>pulsar.UseSinglePartition</code>: publish all messages to a single partition</li><li><code>pulsar.CustomPartition</code>: a custom partitioning scheme</td><td><code>pulsar.RoundRobinDistribution</code></li></td></tr>
<tr><td><code>hashingScheme</code></td><td>HashingScheme</td><td>Hashing function determining the partition where you publish a particular message (<strong>partitioned topics only</strong>).<br />Available options are as follows:<br /><li> <code>pulsar.JavastringHash</code>: the equivalent of <code>string.hashCode()</code> in Java</li><li> <code>pulsar.Murmur3_32Hash</code>: applies the <a href="https://en.wikipedia.org/wiki/MurmurHash">Murmur3</a> hashing function</li><li><code>pulsar.BoostHash</code>: applies the hashing function from C++'s <a href="https://www.boost.org/doc/libs/1_62_0/doc/html/hash.html">Boost</a> library</li></td><td><code>HashingScheme.JavastringHash</code></td></tr>
<tr><td><code>cryptoFailureAction</code></td><td>ProducerCryptoFailureAction</td><td>Producer should take action when encryption fails.<br /><li><strong>FAIL</strong>: if encryption fails, unencrypted messages fail to send.</li><li> <strong>SEND</strong>: if encryption fails, unencrypted messages are sent.</li></td><td><code>ProducerCryptoFailureAction.FAIL</code></td></tr>
<tr><td><code>batchingMaxPublishDelayMicros</code></td><td>long</td><td>Batching time period of sending messages.</td><td>TimeUnit.MILLISECONDS.toMicros(1)</td></tr>
<tr><td><code>batchingMaxMessages</code></td><td>int</td><td>The maximum number of messages permitted in a batch.</td><td>1000</td></tr>
<tr><td><code>batchingEnabled</code></td><td>boolean</td><td>Enable batching of messages.</td><td>true</td></tr>
<tr><td><code>chunkingEnabled</code></td><td>boolean</td><td>Enable chunking of messages.</td><td>false</td></tr>
<tr><td><code>compressionType</code></td><td>CompressionType</td><td>Message data compression type used by a producer. <br />Available options:<li><a href="https://github.com/lz4/lz4"><code>LZ4</code></a></li><li><a href="https://zlib.net/"><code>ZLIB</code></a><br /><li><a href="https://facebook.github.io/zstd/"><code>ZSTD</code></a></li><li><a href="https://google.github.io/snappy/"><code>SNAPPY</code></a></li></td><td>No compression</td></tr>
<tr><td><code>initialSubscriptionName</code></td><td>string</td><td>Use this configuration to automatically create an initial subscription when creating a topic. If this field is not set, the initial subscription is not created.</td><td>null</td></tr>
</tbody>
</table>
<p>You can configure parameters if you do not want to use the default configuration.</p>
<p>For a full list, see the Javadoc for the <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/ProducerBuilder">ProducerBuilder</a>
class. The following is an example.</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; producer = client.newProducer()
.topic(<span class="hljs-string">"my-topic"</span>)
.batchingMaxPublishDelay(<span class="hljs-number">10</span>, TimeUnit.MILLISECONDS)
.sendTimeout(<span class="hljs-number">10</span>, TimeUnit.SECONDS)
.blockIfQueueFull(<span class="hljs-keyword">true</span>)
.create();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="message-routing"></a><a href="#message-routing" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Message routing</h3>
<p>When using partitioned topics, you can specify the routing mode whenever you publish messages using a producer. For more information on specifying a routing mode using the Java client, see the <a href="/docs/en/next/cookbooks-partitioned">Partitioned Topics cookbook</a>.</p>
<h3><a class="anchor" aria-hidden="true" id="async-send"></a><a href="#async-send" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Async send</h3>
<p>You can publish messages <a href="/docs/en/next/concepts-messaging#send-modes">asynchronously</a> using the Java client. With async send, the producer puts the message in a blocking queue and returns it immediately. Then the client library sends the message to the broker in the background. If the queue is full (max size configurable), the producer is blocked or fails immediately when calling the API, depending on arguments passed to the producer.</p>
<p>The following is an example.</p>
<pre><code class="hljs css language-java">producer.sendAsync(<span class="hljs-string">"my-async-message"</span>.getBytes()).thenAccept(msgId -&gt; {
System.out.println(<span class="hljs-string">"Message with ID "</span> + msgId + <span class="hljs-string">" successfully sent"</span>);
});
</code></pre>
<p>As you can see from the example above, async send operations return a <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/MessageId">MessageId</a>
wrapped in a <a href="http://www.baeldung.com/java-completablefuture"><code>CompletableFuture</code></a>.</p>
<h3><a class="anchor" aria-hidden="true" id="configure-messages"></a><a href="#configure-messages" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure messages</h3>
<p>In addition to a value, you can set additional items on a given message:</p>
<pre><code class="hljs css language-java">producer.newMessage()
.key(<span class="hljs-string">"my-message-key"</span>)
.value(<span class="hljs-string">"my-async-message"</span>.getBytes())
.property(<span class="hljs-string">"my-key"</span>, <span class="hljs-string">"my-value"</span>)
.property(<span class="hljs-string">"my-other-key"</span>, <span class="hljs-string">"my-other-value"</span>)
.send();
</code></pre>
<p>You can terminate the builder chain with <code>sendAsync()</code> and get a future return.</p>
<h3><a class="anchor" aria-hidden="true" id="enable-chunking"></a><a href="#enable-chunking" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Enable chunking</h3>
<p>Message <a href="/docs/en/next/concepts-messaging#chunking">chunking</a> enables Pulsar to process large payload messages by splitting the message into chunks at the producer side and aggregating chunked messages on the consumer side.</p>
<p>The message chunking feature is OFF by default. The following is an example of how to enable message chunking when creating a producer.</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; producer = client.newProducer()
.topic(topic)
.enableChunking(<span class="hljs-keyword">true</span>)
.enableBatching(<span class="hljs-keyword">false</span>)
.create();
</code></pre>
<p>By default, producer chunks the large message based on max message size (<code>maxMessageSize</code>) configured at broker (eg: 5MB). However, client can also configure max chunked size using producer configuration <code>chunkMaxMessageSize</code>.</p>
<blockquote>
<p><strong>Note:</strong> To enable chunking, you need to disable batching (<code>enableBatching</code>=<code>false</code>) concurrently.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="consumer"></a><a href="#consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumer</h2>
<p>In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new <a href="/docs/en/next/reference-terminology#consumer">consumer</a> by first instantiating a <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
object and passing it a URL for a Pulsar broker (as <a href="#client-configuration">above</a>).</p>
<p>Once you've instantiated a <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
object, you can create a <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/Consumer">Consumer</a>
by specifying a <a href="/docs/en/next/reference-terminology#topic">topic</a> and a <a href="/docs/en/next/concepts-messaging#subscription-types">subscription</a>.</p>
<pre><code class="hljs css language-java">Consumer consumer = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscribe();
</code></pre>
<p>The <code>subscribe</code> method will auto-subscribe the consumer to the specified topic and subscription. One way to make the consumer listen on the topic is to set up a <code>while</code> loop. In this example loop, the consumer listens for messages, prints the contents of any received message, and then <a href="/docs/en/next/reference-terminology#acknowledgment-ack">acknowledges</a> that the message has been processed. If the processing logic fails, you can use <a href="/docs/en/next/reference-terminology#acknowledgment-ack">negative acknowledgement</a> to redeliver the message later.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
<span class="hljs-comment">// Wait for a message</span>
Message msg = consumer.receive();
<span class="hljs-keyword">try</span> {
<span class="hljs-comment">// Do something with the message</span>
System.out.println(<span class="hljs-string">"Message received: "</span> + <span class="hljs-keyword">new</span> String(msg.getData()));
<span class="hljs-comment">// Acknowledge the message so that it can be deleted by the message broker</span>
consumer.acknowledge(msg);
} <span class="hljs-keyword">catch</span> (Exception e) {
<span class="hljs-comment">// Message failed to process, redeliver later</span>
consumer.negativeAcknowledge(msg);
}
}
</code></pre>
<p>If you don't want to block your main thread and rather listen constantly for new messages, consider using a <code>MessageListener</code>.</p>
<pre><code class="hljs css language-java">MessageListener myMessageListener = (consumer, msg) -&gt; {
<span class="hljs-keyword">try</span> {
System.out.println(<span class="hljs-string">"Message received: "</span> + <span class="hljs-keyword">new</span> String(msg.getData()));
consumer.acknowledge(msg);
} <span class="hljs-keyword">catch</span> (Exception e) {
consumer.negativeAcknowledge(msg);
}
}
Consumer consumer = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.messageListener(myMessageListener)
.subscribe();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="configure-consumer"></a><a href="#configure-consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure consumer</h3>
<p>If you instantiate a <code>Consumer</code> object by specifying only a topic and subscription name as in the example above, the consumer uses the default configuration.</p>
<p>When you create a consumer, you can use the <code>loadConf</code> configuration. The following parameters are available in <code>loadConf</code>.</p>
<table>
<thead>
<tr><th>Name</th><th>Type</th><th><div style="width:300px">Description</div></th><th>Default</th></tr>
</thead>
<tbody>
<tr><td><code>topicNames</code></td><td>Set&lt;String&gt;</td><td>Topic name</td><td>Sets.newTreeSet()</td></tr>
<tr><td><code>topicsPattern</code></td><td>Pattern</td><td>Topic pattern</td><td>None</td></tr>
<tr><td><code>subscriptionName</code></td><td>String</td><td>Subscription name</td><td>None</td></tr>
<tr><td><code>subscriptionType</code></td><td>SubscriptionType</td><td>Subscription type <br />Four subscription types are available:<li>Exclusive</li><li>Failover</li><li>Shared</li><li>Key_Shared</li></td><td>SubscriptionType.Exclusive</td></tr>
<tr><td><code>receiverQueueSize</code></td><td>int</td><td>Size of a consumer's receiver queue. <br /><br />For example, the number of messages accumulated by a consumer before an application calls <code>Receive</code>. <br /><br />A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.</td><td>1000</td></tr>
<tr><td><code>acknowledgementsGroupTimeMicros</code></td><td>long</td><td>Group a consumer acknowledgment for a specified time.<br /><br />By default, a consumer uses 100ms grouping time to send out acknowledgments to a broker.<br /><br />Setting a group time of 0 sends out acknowledgments immediately. <br /><br />A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.</td><td>TimeUnit.MILLISECONDS.toMicros(100)</td></tr>
<tr><td><code>negativeAckRedeliveryDelayMicros</code></td><td>long</td><td>Delay to wait before redelivering messages that failed to be processed.<br /><br /> When an application uses {@link Consumer#negativeAcknowledge(Message)}, failed messages are redelivered after a fixed timeout.</td><td>TimeUnit.MINUTES.toMicros(1)</td></tr>
<tr><td><code>maxTotalReceiverQueueSizeAcrossPartitions</code></td><td>int</td><td>The max total receiver queue size across partitions.<br /><br />This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.</td><td>50000</td></tr>
<tr><td><code>consumerName</code></td><td>String</td><td>Consumer name</td><td>null</td></tr>
<tr><td><code>ackTimeoutMillis</code></td><td>long</td><td>Timeout of unacked messages</td><td>0</td></tr>
<tr><td><code>tickDurationMillis</code></td><td>long</td><td>Granularity of the ack-timeout redelivery.<br /><br />Using an higher <code>tickDurationMillis</code> reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour).</td><td>1000</td></tr>
<tr><td><code>priorityLevel</code></td><td>int</td><td>Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type. <br /><br />The broker follows descending priorities. For example, 0=max-priority, 1, 2,...<br /><br />In Shared subscription type, the broker <strong>first dispatches messages to the max priority level consumers if they have permits</strong>. Otherwise, the broker considers next priority level consumers.<br /><br /> <strong>Example 1</strong><br />If a subscription has consumerA with <code>priorityLevel</code> 0 and consumerB with <code>priorityLevel</code> 1, then the broker <strong>only dispatches messages to consumerA until it runs out permits</strong> and then starts dispatching messages to consumerB.<br /><br /><strong>Example 2</strong><br />Consumer Priority, Level, Permits<br />C1, 0, 2<br />C2, 0, 1<br />C3, 0, 1<br />C4, 1, 2<br />C5, 1, 1<br /><br />Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.</td><td>0</td></tr>
<tr><td><code>cryptoFailureAction</code></td><td>ConsumerCryptoFailureAction</td><td>Consumer should take action when it receives a message that can not be decrypted.<br /><li><strong>FAIL</strong>: this is the default option to fail messages until crypto succeeds.</li><li> <strong>DISCARD</strong>:silently acknowledge and not deliver message to an application.</li><li><strong>CONSUME</strong>: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message.</li><br />The decompression of message fails. <br /><br />If messages contain batch messages, a client is not be able to retrieve individual messages in batch.<br /><br />Delivered encrypted message contains {@link EncryptionContext} which contains encryption and compression information in it using which application can decrypt consumed message payload.</td><td>ConsumerCryptoFailureAction.FAIL</li></td></tr>
<tr><td><code>properties</code></td><td>SortedMap&lt;String, String&gt;</td><td>A name or value property of this consumer.<br /><br /><code>properties</code> is application defined metadata attached to a consumer. <br /><br />When getting a topic stats, associate this metadata with the consumer stats for easier identification.</td><td>new TreeMap&lt;&gt;()</td></tr>
<tr><td><code>readCompacted</code></td><td>boolean</td><td>If enabling <code>readCompacted</code>, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.<br /><br /> A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.<br /><br />Only enabling <code>readCompacted</code> on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions). <br /><br />Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a <code>PulsarClientException</code>.</td><td>false</td></tr>
<tr><td><code>subscriptionInitialPosition</code></td><td>SubscriptionInitialPosition</td><td>Initial position at which to set cursor when subscribing to a topic at first time.</td><td>SubscriptionInitialPosition.Latest</td></tr>
<tr><td><code>patternAutoDiscoveryPeriod</code></td><td>int</td><td>Topic auto discovery period when using a pattern for topic's consumer.<br /><br />The default and minimum value is 1 minute.</td><td>1</td></tr>
<tr><td><code>regexSubscriptionMode</code></td><td>RegexSubscriptionMode</td><td>When subscribing to a topic using a regular expression, you can pick a certain type of topics.<br /><br /><li><strong>PersistentOnly</strong>: only subscribe to persistent topics.</li><li><strong>NonPersistentOnly</strong>: only subscribe to non-persistent topics.</li><li><strong>AllTopics</strong>: subscribe to both persistent and non-persistent topics.</li></td><td>RegexSubscriptionMode.PersistentOnly</td></tr>
<tr><td><code>deadLetterPolicy</code></td><td>DeadLetterPolicy</td><td>Dead letter policy for consumers.<br /><br />By default, some messages are probably redelivered many times, even to the extent that it never stops.<br /><br />By using the dead letter mechanism, messages have the max redelivery count. <strong>When exceeding the maximum number of redeliveries, messages are sent to the Dead Letter Topic and acknowledged automatically</strong>.<br /><br />You can enable the dead letter mechanism by setting <code>deadLetterPolicy</code>.<br /><br /><strong>Example</strong><br /><br /><code>client.newConsumer()<br />.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())<br />.subscribe();</code><br /><br />Default dead letter topic name is <code>{TopicName}-{Subscription}-DLQ</code>.<br /><br />To set a custom dead letter topic name:<br /><code>client.newConsumer()<br />.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10)<br />.deadLetterTopic(&quot;your-topic-name&quot;).build())<br />.subscribe();</code><br /><br />When specifying the dead letter policy while not specifying <code>ackTimeoutMillis</code>, you can set the ack timeout to 30000 millisecond.</td><td>None</td></tr>
<tr><td><code>autoUpdatePartitions</code></td><td>boolean</td><td>If <code>autoUpdatePartitions</code> is enabled, a consumer subscribes to partition increasement automatically.<br /><br /><strong>Note</strong>: this is only for partitioned consumers.</td><td>true</td></tr>
<tr><td><code>replicateSubscriptionState</code></td><td>boolean</td><td>If <code>replicateSubscriptionState</code> is enabled, a subscription state is replicated to geo-replicated clusters.</td><td>false</td></tr>
<tr><td><code>negativeAckRedeliveryBackoff</code></td><td>RedeliveryBackoff</td><td>Interface for custom message is negativeAcked policy. You can specify <code>RedeliveryBackoff</code> for a consumer.</td><td><code>MultiplierRedeliveryBackoff</code></td></tr>
<tr><td><code>ackTimeoutRedeliveryBackoff</code></td><td>RedeliveryBackoff</td><td>Interface for custom message is ackTimeout policy. You can specify <code>RedeliveryBackoff</code> for a consumer.</td><td><code>MultiplierRedeliveryBackoff</code></td></tr>
<tr><td><code>autoAckOldestChunkedMessageOnQueueFull</code></td><td>boolean</td><td>Whether to automatically acknowledge pending chunked messages when the threashold of <code>maxPendingChunkedMessage</code> is reached. If set to <code>false</code>, these messages will be redelivered by their broker.</td><td>true</td></tr>
<tr><td><code>maxPendingChunkedMessage</code></td><td>int</td><td>The maximum size of a queue holding pending chunked messages. When the threshold is reached, the consumer drops pending messages to optimize memory utilization.</td><td>10</td></tr>
<tr><td><code>expireTimeOfIncompleteChunkedMessageMillis</code></td><td>long</td><td>The time interval to expire incomplete chunks if a consumer fails to receive all the chunks in the specified time period. The default value is 1 minute.</td><td>60000</td></tr>
</tbody>
</table>
<p>You can configure parameters if you do not want to use the default configuration. For a full list, see the Javadoc for the <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/ConsumerBuilder">ConsumerBuilder</a>
class.</p>
<p>The following is an example.</p>
<pre><code class="hljs css language-java">Consumer consumer = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.ackTimeout(<span class="hljs-number">10</span>, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="async-receive"></a><a href="#async-receive" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Async receive</h3>
<p>The <code>receive</code> method receives messages synchronously (the consumer process is blocked until a message is available). You can also use <a href="/docs/en/next/concepts-messaging#receive-modes">async receive</a>, which returns a <a href="http://www.baeldung.com/java-completablefuture"><code>CompletableFuture</code></a> object immediately once a new message is available.</p>
<p>The following is an example.</p>
<pre><code class="hljs css language-java">CompletableFuture&lt;Message&gt; asyncMessage = consumer.receiveAsync();
</code></pre>
<p>Async receive operations return a <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/Message">Message</a>
wrapped inside of a <a href="http://www.baeldung.com/java-completablefuture"><code>CompletableFuture</code></a>.</p>
<h3><a class="anchor" aria-hidden="true" id="batch-receive"></a><a href="#batch-receive" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Batch receive</h3>
<p>Use <code>batchReceive</code> to receive multiple messages for each call.</p>
<p>The following is an example.</p>
<pre><code class="hljs css language-java">Messages messages = consumer.batchReceive();
<span class="hljs-keyword">for</span> (Object message : messages) {
<span class="hljs-comment">// do something</span>
}
consumer.acknowledge(messages)
</code></pre>
<blockquote>
<p>Note:</p>
<p>Batch receive policy limits the number and bytes of messages in a single batch. You can specify a timeout to wait for enough messages.</p>
<p>The batch receive is completed if any of the following conditions is met: enough number of messages, bytes of messages, wait timeout.</p>
<pre><code class="hljs css language-java">Consumer consumer = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.batchReceivePolicy(BatchReceivePolicy.builder()
.maxNumMessages(<span class="hljs-number">100</span>)
.maxNumBytes(<span class="hljs-number">1024</span> * <span class="hljs-number">1024</span>)
.timeout(<span class="hljs-number">200</span>, TimeUnit.MILLISECONDS)
.build())
.subscribe();
</code></pre>
<p>The default batch receive policy is:</p>
<pre><code class="hljs css language-java">BatchReceivePolicy.builder()
.maxNumMessage(-<span class="hljs-number">1</span>)
.maxNumBytes(<span class="hljs-number">10</span> * <span class="hljs-number">1024</span> * <span class="hljs-number">1024</span>)
.timeout(<span class="hljs-number">100</span>, TimeUnit.MILLISECONDS)
.build();
</code></pre>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="configure-chunking"></a><a href="#configure-chunking" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure chunking</h3>
<p>You can limit the maximum number of chunked messages a consumer maintains concurrently by configuring the <code>maxPendingChunkedMessage</code> and <code>autoAckOldestChunkedMessageOnQueueFull</code> parameters. When the threshold is reached, the consumer drops pending messages by silently acknowledging them or asking the broker to redeliver them later. The <code>expireTimeOfIncompleteChunkedMessage</code> parameter decides the time interval to expire incomplete chunks if the consumer fails to receive all chunks of a message within the specified time period.</p>
<p>The following is an example of how to configure message chunking.</p>
<pre><code class="hljs css language-java">Consumer&lt;<span class="hljs-keyword">byte</span>[]&gt; consumer = client.newConsumer()
.topic(topic)
.subscriptionName(<span class="hljs-string">"test"</span>)
.autoAckOldestChunkedMessageOnQueueFull(<span class="hljs-keyword">true</span>)
.maxPendingChunkedMessage(<span class="hljs-number">100</span>)
.expireTimeOfIncompleteChunkedMessage(<span class="hljs-number">10</span>, TimeUnit.MINUTES)
.subscribe();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="negative-acknowledgment-redelivery-backoff"></a><a href="#negative-acknowledgment-redelivery-backoff" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Negative acknowledgment redelivery backoff</h3>
<p>The <code>RedeliveryBackoff</code> introduces a redelivery backoff mechanism. You can achieve redelivery with different delays by setting <code>redeliveryCount</code> of messages.</p>
<pre><code class="hljs css language-java">Consumer consumer = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.minDelayMs(<span class="hljs-number">1000</span>)
.maxDelayMs(<span class="hljs-number">60</span> * <span class="hljs-number">1000</span>)
.build())
.subscribe();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="acknowledgement-timeout-redelivery-backoff"></a><a href="#acknowledgement-timeout-redelivery-backoff" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Acknowledgement timeout redelivery backoff</h3>
<p>The <code>RedeliveryBackoff</code> introduces a redelivery backoff mechanism. You can redeliver messages with different delays by setting the number
of times the messages is retried.</p>
<pre><code class="hljs css language-java">Consumer consumer = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.ackTimeout(<span class="hljs-number">10</span>, TimeUnit.SECOND)
.ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.minDelayMs(<span class="hljs-number">1000</span>)
.maxDelayMs(<span class="hljs-number">60000</span>)
.multiplier(<span class="hljs-number">2</span>)
.build())
.subscribe();
</code></pre>
<p>The message redelivery behavior should be as follows.</p>
<table>
<thead>
<tr><th style="text-align:left">Redelivery count</th><th style="text-align:left">Redelivery delay</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">1</td><td style="text-align:left">10 + 1 seconds</td></tr>
<tr><td style="text-align:left">2</td><td style="text-align:left">10 + 2 seconds</td></tr>
<tr><td style="text-align:left">3</td><td style="text-align:left">10 + 4 seconds</td></tr>
<tr><td style="text-align:left">4</td><td style="text-align:left">10 + 8 seconds</td></tr>
<tr><td style="text-align:left">5</td><td style="text-align:left">10 + 16 seconds</td></tr>
<tr><td style="text-align:left">6</td><td style="text-align:left">10 + 32 seconds</td></tr>
<tr><td style="text-align:left">7</td><td style="text-align:left">10 + 60 seconds</td></tr>
<tr><td style="text-align:left">8</td><td style="text-align:left">10 + 60 seconds</td></tr>
</tbody>
</table>
<blockquote>
<p><strong>Note</strong></p>
<ul>
<li>The <code>negativeAckRedeliveryBackoff</code> does not work with <code>consumer.negativeAcknowledge(MessageId messageId)</code> because you are not able to get the redelivery count from the message ID.</li>
<li>If a consumer crashes, it triggers the redelivery of unacked messages. In this case, <code>RedeliveryBackoff</code> does not take effect and the messages might get redelivered earlier than the delay time from the backoff.</li>
</ul>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="multi-topic-subscriptions"></a><a href="#multi-topic-subscriptions" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Multi-topic subscriptions</h3>
<p>In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously using <a href="/docs/en/next/concepts-messaging#multi-topic-subscriptions">multi-topic subscriptions</a>. To use multi-topic subscriptions you can supply either a regular expression (regex) or a <code>List</code> of topics. If you select topics via regex, all topics must be within the same Pulsar namespace.</p>
<p>The followings are some examples.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.client.api.Consumer;
<span class="hljs-keyword">import</span> org.apache.pulsar.client.api.PulsarClient;
<span class="hljs-keyword">import</span> java.util.Arrays;
<span class="hljs-keyword">import</span> java.util.List;
<span class="hljs-keyword">import</span> java.util.regex.Pattern;
ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
.subscriptionName(subscription);
<span class="hljs-comment">// Subscribe to all topics in a namespace</span>
Pattern allTopicsInNamespace = Pattern.compile(<span class="hljs-string">"public/default/.*"</span>);
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.subscribe();
<span class="hljs-comment">// Subscribe to a subsets of topics in a namespace, based on regex</span>
Pattern someTopicsInNamespace = Pattern.compile(<span class="hljs-string">"public/default/foo.*"</span>);
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(someTopicsInNamespace)
.subscribe();
</code></pre>
<p>In the above example, the consumer subscribes to the <code>persistent</code> topics that can match the topic name pattern. If you want the consumer subscribes to all <code>persistent</code> and <code>non-persistent</code> topics that can match the topic name pattern, set <code>subscriptionTopicsMode</code> to <code>RegexSubscriptionMode.AllTopics</code>.</p>
<pre><code class="hljs css language-java">Pattern pattern = Pattern.compile(<span class="hljs-string">"public/default/.*"</span>);
pulsarClient.newConsumer()
.subscriptionName(<span class="hljs-string">"my-sub"</span>)
.topicsPattern(pattern)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscribe();
</code></pre>
<blockquote>
<p><strong>Note</strong> <br>
By default, the <code>subscriptionTopicsMode</code> of the consumer is <code>PersistentOnly</code>. Available options of <code>subscriptionTopicsMode</code> are <code>PersistentOnly</code>, <code>NonPersistentOnly</code>, and <code>AllTopics</code>.</p>
</blockquote>
<p>You can also subscribe to an explicit list of topics (across namespaces if you wish):</p>
<pre><code class="hljs css language-java">List&lt;String&gt; topics = Arrays.asList(
<span class="hljs-string">"topic-1"</span>,
<span class="hljs-string">"topic-2"</span>,
<span class="hljs-string">"topic-3"</span>
);
Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.subscribe();
<span class="hljs-comment">// Alternatively:</span>
Consumer multiTopicConsumer = consumerBuilder
.topic(
<span class="hljs-string">"topic-1"</span>,
<span class="hljs-string">"topic-2"</span>,
<span class="hljs-string">"topic-3"</span>
)
.subscribe();
</code></pre>
<p>You can also subscribe to multiple topics asynchronously using the <code>subscribeAsync</code> method rather than the synchronous <code>subscribe</code> method. The following is an example.</p>
<pre><code class="hljs css language-java">Pattern allTopicsInNamespace = Pattern.compile(<span class="hljs-string">"persistent://public/default.*"</span>);
consumerBuilder
.topics(topics)
.subscribeAsync()
.thenAccept(<span class="hljs-keyword">this</span>::receiveMessageFromConsumer);
<span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">void</span> <span class="hljs-title">receiveMessageFromConsumer</span><span class="hljs-params">(Object consumer)</span> </span>{
((Consumer)consumer).receiveAsync().thenAccept(message -&gt; {
<span class="hljs-comment">// Do something with the received message</span>
receiveMessageFromConsumer(consumer);
});
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="subscription-types"></a><a href="#subscription-types" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Subscription types</h3>
<p>Pulsar has various <a href="concepts-messaging#subscription-types">subscription types</a> to match different scenarios. A topic can have multiple subscriptions with different subscription types. However, a subscription can only have one subscription type at a time.</p>
<p>A subscription is identical with the subscription name; a subscription name can specify only one subscription type at a time. To change the subscription type, you should first stop all consumers of this subscription.</p>
<p>Different subscription types have different message distribution types. This section describes the differences between subscription types and how to use them.</p>
<p>In order to better describe their differences, assuming you have a topic named &quot;my-topic&quot;, and the producer has published 10 messages.</p>
<pre><code class="hljs css language-java">Producer&lt;String&gt; producer = client.newProducer(Schema.STRING)
.topic(<span class="hljs-string">"my-topic"</span>)
.enableBatching(<span class="hljs-keyword">false</span>)
.create();
<span class="hljs-comment">// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"</span>
producer.newMessage().key(<span class="hljs-string">"key-1"</span>).value(<span class="hljs-string">"message-1-1"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-1"</span>).value(<span class="hljs-string">"message-1-2"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-1"</span>).value(<span class="hljs-string">"message-1-3"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-2"</span>).value(<span class="hljs-string">"message-2-1"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-2"</span>).value(<span class="hljs-string">"message-2-2"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-2"</span>).value(<span class="hljs-string">"message-2-3"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-3"</span>).value(<span class="hljs-string">"message-3-1"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-3"</span>).value(<span class="hljs-string">"message-3-2"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-4"</span>).value(<span class="hljs-string">"message-4-1"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-4"</span>).value(<span class="hljs-string">"message-4-2"</span>).send();
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="exclusive"></a><a href="#exclusive" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Exclusive</h4>
<p>Create a new consumer and subscribe with the <code>Exclusive</code> subscription type.</p>
<pre><code class="hljs css language-java">Consumer consumer = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe()
</code></pre>
<p>Only the first consumer is allowed to the subscription, other consumers receive an error. The first consumer receives all 10 messages, and the consuming order is the same as the producing order.</p>
<blockquote>
<p>Note:</p>
<p>If topic is a partitioned topic, the first consumer subscribes to all partitioned topics, other consumers are not assigned with partitions and receive an error.</p>
</blockquote>
<h4><a class="anchor" aria-hidden="true" id="failover"></a><a href="#failover" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Failover</h4>
<p>Create new consumers and subscribe with the<code>Failover</code> subscription type.</p>
<pre><code class="hljs css language-java">Consumer consumer1 = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Failover)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Failover)
.subscribe()
<span class="hljs-comment">//conumser1 is the active consumer, consumer2 is the standby consumer.</span>
<span class="hljs-comment">//consumer1 receives 5 messages and then crashes, consumer2 takes over as an active consumer.</span>
</code></pre>
<p>Multiple consumers can attach to the same subscription, yet only the first consumer is active, and others are standby. When the active consumer is disconnected, messages will be dispatched to one of standby consumers, and the standby consumer then becomes active consumer.</p>
<p>If the first active consumer is disconnected after receiving 5 messages, the standby consumer becomes active consumer. Consumer1 will receive:</p>
<pre><code class="hljs">(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-1"</span>)
(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-2"</span>)
(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-3"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-1"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-2"</span>)
</code></pre>
<p>consumer2 will receive:</p>
<pre><code class="hljs">(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-3"</span>)
(<span class="hljs-string">"key-3"</span>, <span class="hljs-string">"message-3-1"</span>)
(<span class="hljs-string">"key-3"</span>, <span class="hljs-string">"message-3-2"</span>)
(<span class="hljs-string">"key-4"</span>, <span class="hljs-string">"message-4-1"</span>)
(<span class="hljs-string">"key-4"</span>, <span class="hljs-string">"message-4-2"</span>)
</code></pre>
<blockquote>
<p>Note:</p>
<p>If a topic is a partitioned topic, each partition has only one active consumer, messages of one partition are distributed to only one consumer, and messages of multiple partitions are distributed to multiple consumers.</p>
</blockquote>
<h4><a class="anchor" aria-hidden="true" id="shared"></a><a href="#shared" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Shared</h4>
<p>Create new consumers and subscribe with <code>Shared</code> subscription type.</p>
<pre><code class="hljs css language-java">Consumer consumer1 = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Shared)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Shared)
.subscribe()
<span class="hljs-comment">//Both consumer1 and consumer 2 is active consumers.</span>
</code></pre>
<p>In Shared subscription type, multiple consumers can attach to the same subscription and messages are delivered in a round-robin distribution across consumers.</p>
<p>If a broker dispatches only one message at a time, consumer1 receives the following information.</p>
<pre><code class="hljs">(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-1"</span>)
(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-3"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-2"</span>)
(<span class="hljs-string">"key-3"</span>, <span class="hljs-string">"message-3-1"</span>)
(<span class="hljs-string">"key-4"</span>, <span class="hljs-string">"message-4-1"</span>)
</code></pre>
<p>consumer2 receives the following information.</p>
<pre><code class="hljs">(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-2"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-1"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-3"</span>)
(<span class="hljs-string">"key-3"</span>, <span class="hljs-string">"message-3-2"</span>)
(<span class="hljs-string">"key-4"</span>, <span class="hljs-string">"message-4-2"</span>)
</code></pre>
<p><code>Shared</code> subscription is different from <code>Exclusive</code> and <code>Failover</code> subscription types. <code>Shared</code> subscription has better flexibility, but cannot provide order guarantee.</p>
<h4><a class="anchor" aria-hidden="true" id="key_shared"></a><a href="#key_shared" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Key_shared</h4>
<p>This is a new subscription type since 2.4.0 release. Create new consumers and subscribe with <code>Key_Shared</code> subscription type.</p>
<pre><code class="hljs css language-java">Consumer consumer1 = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
<span class="hljs-comment">//Both consumer1 and consumer2 are active consumers.</span>
</code></pre>
<p>Just like in <code>Shared</code> subscription, all consumers in <code>Key_Shared</code> subscription type can attach to the same subscription. But <code>Key_Shared</code> subscription type is different from the <code>Shared</code> subscription. In <code>Key_Shared</code> subscription type, messages with the same key are delivered to only one consumer in order. The possible distribution of messages between different consumers (by default we do not know in advance which keys will be assigned to a consumer, but a key will only be assigned to a consumer at the same time).</p>
<p>consumer1 receives the following information.</p>
<pre><code class="hljs">(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-1"</span>)
(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-2"</span>)
(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-3"</span>)
(<span class="hljs-string">"key-3"</span>, <span class="hljs-string">"message-3-1"</span>)
(<span class="hljs-string">"key-3"</span>, <span class="hljs-string">"message-3-2"</span>)
</code></pre>
<p>consumer2 receives the following information.</p>
<pre><code class="hljs">(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-1"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-2"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-3"</span>)
(<span class="hljs-string">"key-4"</span>, <span class="hljs-string">"message-4-1"</span>)
(<span class="hljs-string">"key-4"</span>, <span class="hljs-string">"message-4-2"</span>)
</code></pre>
<p>If batching is enabled at the producer side, messages with different keys are added to a batch by default. The broker will dispatch the batch to the consumer, so the default batch mechanism may break the Key_Shared subscription guaranteed message distribution semantics. The producer needs to use the <code>KeyBasedBatcher</code>.</p>
<pre><code class="hljs css language-java">Producer producer = client.newProducer()
.topic(<span class="hljs-string">"my-topic"</span>)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
</code></pre>
<p>Or the producer can disable batching.</p>
<pre><code class="hljs css language-java">Producer producer = client.newProducer()
.topic(<span class="hljs-string">"my-topic"</span>)
.enableBatching(<span class="hljs-keyword">false</span>)
.create();
</code></pre>
<blockquote>
<p>Note:</p>
<p>If the message key is not specified, messages without key are dispatched to one consumer in order by default.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="reader"></a><a href="#reader" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reader</h2>
<p>With the <a href="/docs/en/next/concepts-clients#reader-interface">reader interface</a>, Pulsar clients can &quot;manually position&quot; themselves within a topic and reading all messages from a specified message onward. The Pulsar API for Java enables you to create <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/Reader">Reader</a>
objects by specifying a topic and a <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/MessageId">MessageId</a>
.</p>
<p>The following is an example.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">byte</span>[] msgIdBytes = <span class="hljs-comment">// Some message ID byte array</span>
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();
<span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
Message message = reader.readNext();
<span class="hljs-comment">// Process message</span>
}
</code></pre>
<p>In the example above, a <code>Reader</code> object is instantiated for a specific topic and message (by ID); the reader iterates over each message in the topic after the message is identified by <code>msgIdBytes</code> (how that value is obtained depends on the application).</p>
<p>The code sample above shows pointing the <code>Reader</code> object to a specific message (by ID), but you can also use <code>MessageId.earliest</code> to point to the earliest available message on the topic of <code>MessageId.latest</code> to point to the most recent available message.</p>
<h3><a class="anchor" aria-hidden="true" id="configure-reader"></a><a href="#configure-reader" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure reader</h3>
<p>When you create a reader, you can use the <code>loadConf</code> configuration. The following parameters are available in <code>loadConf</code>.</p>
<table>
<thead>
<tr><th>Name</th><th>Type</th><th><div style="width:300px">Description</div></th><th>Default</th></tr>
</thead>
<tbody>
<tr><td><code>topicName</code></td><td>String</td><td>Topic name.</td><td>None</td></tr>
<tr><td><code>receiverQueueSize</code></td><td>int</td><td>Size of a consumer's receiver queue.<br /><br />For example, the number of messages that can be accumulated by a consumer before an application calls <code>Receive</code>.<br /><br />A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.</td><td>1000</td></tr>
<tr><td><code>readerListener</code></td><td>ReaderListener&lt;T&gt;</td><td>A listener that is called for message received.</td><td>None</td></tr>
<tr><td><code>readerName</code></td><td>String</td><td>Reader name.</td><td>null</td></tr>
<tr><td><code>subscriptionName</code></td><td>String</td><td>Subscription name</td><td>When there is a single topic, the default subscription name is <code>&quot;reader-&quot; + 10-digit UUID</code>.<br />When there are multiple topics, the default subscription name is <code>&quot;multiTopicsReader-&quot; + 10-digit UUID</code>.</td></tr>
<tr><td><code>subscriptionRolePrefix</code></td><td>String</td><td>Prefix of subscription role.</td><td>null</td></tr>
<tr><td><code>cryptoKeyReader</code></td><td>CryptoKeyReader</td><td>Interface that abstracts the access to a key store.</td><td>null</td></tr>
<tr><td><code>cryptoFailureAction</code></td><td>ConsumerCryptoFailureAction</td><td>Consumer should take action when it receives a message that can not be decrypted.<br /><li><strong>FAIL</strong>: this is the default option to fail messages until crypto succeeds.</li><li> <strong>DISCARD</strong>: silently acknowledge and not deliver message to an application.</li><li><strong>CONSUME</strong>: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message.</li><br />The message decompression fails. <br /><br />If messages contain batch messages, a client is not be able to retrieve individual messages in batch.<br /><br />Delivered encrypted message contains {@link EncryptionContext} which contains encryption and compression information in it using which application can decrypt consumed message payload.</td><td>ConsumerCryptoFailureAction.FAIL</li></td></tr>
<tr><td><code>readCompacted</code></td><td>boolean</td><td>If enabling <code>readCompacted</code>, a consumer reads messages from a compacted topic rather than a full message backlog of a topic.<br /><br /> A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.<br /><br /><code>readCompacted</code> can only be enabled on subscriptions to persistent topics, which have a single active consumer (for example, failure or exclusive subscriptions). <br /><br />Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a <code>PulsarClientException</code>.</td><td>false</td></tr>
<tr><td><code>resetIncludeHead</code></td><td>boolean</td><td>If set to true, the first message to be returned is the one specified by <code>messageId</code>.<br /><br />If set to false, the first message to be returned is the one next to the message specified by <code>messageId</code>.</td><td>false</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="sticky-key-range-reader"></a><a href="#sticky-key-range-reader" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Sticky key range reader</h3>
<p>In sticky key range reader, broker will only dispatch messages which hash of the message key contains by the specified key hash range. Multiple key hash ranges can be specified on a reader.</p>
<p>The following is an example to create a sticky key range reader.</p>
<pre><code class="hljs css language-java">pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.keyHashRange(Range.of(<span class="hljs-number">0</span>, <span class="hljs-number">10000</span>), Range.of(<span class="hljs-number">20001</span>, <span class="hljs-number">30000</span>))
.create();
</code></pre>
<p>Total hash range size is 65536, so the max end of the range should be less than or equal to 65535.</p>
<h3><a class="anchor" aria-hidden="true" id="configure-chunking-1"></a><a href="#configure-chunking-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure chunking</h3>
<p>Configuring chuncking for readers is similar to that for consumers. See <a href="#configure-chunking">configure chunking for consumers</a> for more information.</p>
<p>The following is an example of how to configure message chunking for a reader.</p>
<pre><code class="hljs css language-java">Reader&lt;<span class="hljs-keyword">byte</span>[]&gt; reader = pulsarClient.newReader()
.topic(topicName)
.startMessageId(MessageId.earliest)
.maxPendingChunkedMessage(<span class="hljs-number">12</span>)
.autoAckOldestChunkedMessageOnQueueFull(<span class="hljs-keyword">true</span>)
.expireTimeOfIncompleteChunkedMessage(<span class="hljs-number">12</span>, TimeUnit.MILLISECONDS)
.create();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="create-reader-with-interceptor"></a><a href="#create-reader-with-interceptor" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create reader with interceptor</h3>
<p>Pulsar reader interceptor intercepts and possibly mutates messages with user-defined processing before <a href="/docs/en/next/concepts-clients#reader-interface">Pulsar reader</a> reads them. With reader interceptors, you can apply unified messaging processes before messages can be read, such as modifying messages, adding properties, collecting statistics and etc, without creating similar mechanisms respectively.</p>
<p><img src="/docs/assets/reader-interceptor.svg" alt="Reader interceptor"></p>
<p>Pulsar reader interceptor works on top of Pulsar consumer interceptor. The plugin interface <code>ReaderInterceptor</code> can be treated as a subset of <code>ConsumerInterceptor</code> and it has two main events.</p>
<ul>
<li><code>beforeRead</code> is triggered before readers read messages. You can modify messages within this event.</li>
<li><code>onPartitionsChange</code> is triggered when changes on partitions have been detected.</li>
</ul>
<p>To perceive triggered events and perform customized processing, you can add <code>ReaderInterceptor</code> when creating a <code>Reader</code> as follows.</p>
<pre><code class="hljs css language-java">PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(<span class="hljs-string">"pulsar://localhost:6650"</span>).build();
Reader&lt;<span class="hljs-keyword">byte</span>[]&gt; reader = pulsarClient.newReader()
.topic(“t1”)
.autoUpdatePartitionsInterval(<span class="hljs-number">5</span>, TimeUnit.SECONDS)
.intercept(<span class="hljs-keyword">new</span> ReaderInterceptor&lt;<span class="hljs-keyword">byte</span>[]&gt;() {
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">close</span><span class="hljs-params">()</span> </span>{
}
<span class="hljs-meta">@Override</span>
<span class="hljs-keyword">public</span> Message&lt;<span class="hljs-keyword">byte</span>[]&gt; beforeRead(Reader&lt;<span class="hljs-keyword">byte</span>[]&gt; reader, Message&lt;<span class="hljs-keyword">byte</span>[]&gt; message) {
<span class="hljs-comment">// user-defined processing logic</span>
<span class="hljs-keyword">return</span> message;
}
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">onPartitionsChange</span><span class="hljs-params">(String topicName, <span class="hljs-keyword">int</span> partitions)</span> </span>{
<span class="hljs-comment">// user-defined processing logic</span>
}
})
.startMessageId(MessageId.earliest)
.create();
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="tableview"></a><a href="#tableview" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>TableView</h2>
<p>The TableView interface serves an encapsulated access pattern, providing a continuously updated key-value map view of the compacted topic data. Messages without keys will be ignored.</p>
<p>With TableView, Pulsar clients can fetch all the message updates from a topic and construct a map with the latest values of each key. These values can then be used to build a local cache of data. In addition, you can register consumers with the TableView by specifying a listener to perform a scan of the map and then receive notifications when new messages are received. Consequently, event handling can be triggered to serve use cases, such as event-driven applications and message monitoring.</p>
<blockquote>
<p><strong>Note</strong></p>
<p>Each TableView uses one Reader instance per partition, and reads the topic starting from the compacted view by default. It is highly recommended to enable automatic compaction by <a href="/docs/en/next/cookbooks-compaction#configuring-compaction-to-run-automatically">configuring the topic compaction policies</a> for the given topic or namespace. More frequent compaction results in shorter startup times because less data is replayed to reconstruct the TableView of the topic.</p>
</blockquote>
<p>The following figure illustrates the dynamic construction of a TableView updated with newer values of each key.
<img src="/docs/assets/tableview.png" alt="TableView"></p>
<h3><a class="anchor" aria-hidden="true" id="configure-tableview"></a><a href="#configure-tableview" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure TableView</h3>
<p>The following is an example of how to configure a TableView.</p>
<pre><code class="hljs css language-java">TableView&lt;String&gt; tv = client.newTableViewBuilder(Schema.STRING)
.topic(<span class="hljs-string">"my-tableview"</span>)
.create()
</code></pre>
<p>You can use the available parameters in the <code>loadConf</code> configuration or related <a href="https://pulsar.apache.org/api/client/2.10.0-SNAPSHOT/2.10.0-SNAPSHOT/org/apache/pulsar/client/api/TableViewBuilder.html">API</a> to customize your TableView.</p>
<table>
<thead>
<tr><th>Name</th><th>Type</th><th>Required?</th><th><div style="width:300px">Description</div></th><th>Default</th></tr>
</thead>
<tbody>
<tr><td><code>topic</code></td><td>string</td><td>yes</td><td>The topic name of the TableView.</td><td>N/A</td></tr>
<tr><td><code>autoUpdatePartitionInterval</code></td><td>int</td><td>no</td><td>The interval to check for newly added partitions.</td><td>60 (seconds)</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="register-listeners"></a><a href="#register-listeners" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Register listeners</h3>
<p>You can register listeners for both existing messages on a topic and new messages coming into the topic by using <code>forEachAndListen</code>, and specify to perform operations for all existing messages by using <code>forEach</code>.</p>
<p>The following is an example of how to register listeners with TableView.</p>
<pre><code class="hljs css language-java"><span class="hljs-comment">// Register listeners for all existing and incoming messages</span>
tv.forEachAndListen((key, value) -&gt; <span class="hljs-comment">/*operations on all existing and incoming messages*/</span>)
<span class="hljs-comment">// Register action for all existing messages</span>
tv.forEach((key, value) -&gt; <span class="hljs-comment">/*operations on all existing messages*/</span>)
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="schema"></a><a href="#schema" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Schema</h2>
<p>In Pulsar, all message data consists of byte arrays &quot;under the hood.&quot; <a href="/docs/en/next/schema-get-started">Message schemas</a> enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). If you construct, say, a <a href="#producer">producer</a> without specifying a schema, then the producer can only produce messages of type <code>byte[]</code>. The following is an example.</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; producer = client.newProducer()
.topic(topic)
.create();
</code></pre>
<p>The producer above is equivalent to a <code>Producer&lt;byte[]&gt;</code> (in fact, you should <em>always</em> explicitly specify the type). If you'd like to use a producer for a different type of data, you'll need to specify a <strong>schema</strong> that informs Pulsar which data type will be transmitted over the <a href="/docs/en/next/reference-terminology#topic">topic</a>.</p>
<h3><a class="anchor" aria-hidden="true" id="avrobasestructschema-example"></a><a href="#avrobasestructschema-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>AvroBaseStructSchema example</h3>
<p>Let's say that you have a <code>SensorReading</code> class that you'd like to transmit over a Pulsar topic:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">SensorReading</span> </span>{
<span class="hljs-keyword">public</span> <span class="hljs-keyword">float</span> temperature;
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">SensorReading</span><span class="hljs-params">(<span class="hljs-keyword">float</span> temperature)</span> </span>{
<span class="hljs-keyword">this</span>.temperature = temperature;
}
<span class="hljs-comment">// A no-arg constructor is required</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">SensorReading</span><span class="hljs-params">()</span> </span>{
}
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">float</span> <span class="hljs-title">getTemperature</span><span class="hljs-params">()</span> </span>{
<span class="hljs-keyword">return</span> temperature;
}
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">setTemperature</span><span class="hljs-params">(<span class="hljs-keyword">float</span> temperature)</span> </span>{
<span class="hljs-keyword">this</span>.temperature = temperature;
}
}
</code></pre>
<p>You could then create a <code>Producer&lt;SensorReading&gt;</code> (or <code>Consumer&lt;SensorReading&gt;</code>) like this:</p>
<pre><code class="hljs css language-java">Producer&lt;SensorReading&gt; producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-readings")
.create();
</code></pre>
<p>The following schema formats are currently available for Java:</p>
<ul>
<li><p>No schema or the byte array schema (which can be applied using <code>Schema.BYTES</code>):</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; bytesProducer = client.newProducer(Schema.BYTES)
.topic(<span class="hljs-string">"some-raw-bytes-topic"</span>)
.create();
</code></pre>
<p>Or, equivalently:</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; bytesProducer = client.newProducer()
.topic(<span class="hljs-string">"some-raw-bytes-topic"</span>)
.create();
</code></pre></li>
<li><p><code>String</code> for normal UTF-8-encoded string data. Apply the schema using <code>Schema.STRING</code>:</p>
<pre><code class="hljs css language-java">Producer&lt;String&gt; stringProducer = client.newProducer(Schema.STRING)
.topic(<span class="hljs-string">"some-string-topic"</span>)
.create();
</code></pre></li>
<li><p>Create JSON schemas for POJOs using <code>Schema.JSON</code>. The following is an example.</p>
<pre><code class="hljs css language-java">Producer&lt;MyPojo&gt; pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))
.topic("some-pojo-topic")
.create();
</code></pre></li>
<li><p>Generate Protobuf schemas using <code>Schema.PROTOBUF</code>. The following example shows how to create the Protobuf schema and use it to instantiate a new producer:</p>
<pre><code class="hljs css language-java">Producer&lt;MyProtobuf&gt; protobufProducer = client.newProducer(Schema.PROTOBUF(MyProtobuf.class))
.topic("some-protobuf-topic")
.create();
</code></pre></li>
<li><p>Define Avro schemas with <code>Schema.AVRO</code>. The following code snippet demonstrates how to create and use Avro schema.</p>
<pre><code class="hljs css language-java">Producer&lt;MyAvro&gt; avroProducer = client.newProducer(Schema.AVRO(MyAvro.class))
.topic("some-avro-topic")
.create();
</code></pre></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="protobufnativeschema-example"></a><a href="#protobufnativeschema-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>ProtobufNativeSchema example</h3>
<p>For example of ProtobufNativeSchema, see <a href="/docs/en/next/schema-understand#complex-type"><code>SchemaDefinition</code> in <code>Complex type</code></a>.</p>
<h2><a class="anchor" aria-hidden="true" id="authentication"></a><a href="#authentication" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Authentication</h2>
<p>Pulsar currently supports three authentication schemes: <a href="/docs/en/next/security-tls-authentication">TLS</a>, <a href="/docs/en/next/security-athenz">Athenz</a>, and <a href="/docs/en/next/security-oauth2">Oauth2</a>. You can use the Pulsar Java client with all of them.</p>
<h3><a class="anchor" aria-hidden="true" id="tls-authentication"></a><a href="#tls-authentication" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>TLS Authentication</h3>
<p>To use <a href="/docs/en/next/security-tls-authentication">TLS</a>, you need to set TLS to <code>true</code> using the <code>setUseTls</code> method, point your Pulsar client to a TLS cert path, and provide paths to cert and key files.</p>
<p>The following is an example.</p>
<pre><code class="hljs css language-java">Map&lt;String, String&gt; authParams = <span class="hljs-keyword">new</span> HashMap&lt;&gt;();
authParams.put(<span class="hljs-string">"tlsCertFile"</span>, <span class="hljs-string">"/path/to/client-cert.pem"</span>);
authParams.put(<span class="hljs-string">"tlsKeyFile"</span>, <span class="hljs-string">"/path/to/client-key.pem"</span>);
Authentication tlsAuth = AuthenticationFactory
.create(AuthenticationTls<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>(), <span class="hljs-title">authParams</span>)</span>;
PulsarClient client = PulsarClient.builder()
.serviceUrl(<span class="hljs-string">"pulsar+ssl://my-broker.com:6651"</span>)
.enableTls(<span class="hljs-keyword">true</span>)
.tlsTrustCertsFilePath(<span class="hljs-string">"/path/to/cacert.pem"</span>)
.authentication(tlsAuth)
.build();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="athenz"></a><a href="#athenz" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Athenz</h3>
<p>To use <a href="/docs/en/next/security-athenz">Athenz</a> as an authentication provider, you need to <a href="#tls-authentication">use TLS</a> and provide values for four parameters in a hash:</p>
<ul>
<li><code>tenantDomain</code></li>
<li><code>tenantService</code></li>
<li><code>providerDomain</code></li>
<li><code>privateKey</code></li>
</ul>
<p>You can also set an optional <code>keyId</code>. The following is an example.</p>
<pre><code class="hljs css language-java">Map&lt;String, String&gt; authParams = <span class="hljs-keyword">new</span> HashMap&lt;&gt;();
authParams.put(<span class="hljs-string">"tenantDomain"</span>, <span class="hljs-string">"shopping"</span>); <span class="hljs-comment">// Tenant domain name</span>
authParams.put(<span class="hljs-string">"tenantService"</span>, <span class="hljs-string">"some_app"</span>); <span class="hljs-comment">// Tenant service name</span>
authParams.put(<span class="hljs-string">"providerDomain"</span>, <span class="hljs-string">"pulsar"</span>); <span class="hljs-comment">// Provider domain name</span>
authParams.put(<span class="hljs-string">"privateKey"</span>, <span class="hljs-string">"file:///path/to/private.pem"</span>); <span class="hljs-comment">// Tenant private key path</span>
authParams.put(<span class="hljs-string">"keyId"</span>, <span class="hljs-string">"v1"</span>); <span class="hljs-comment">// Key id for the tenant private key (optional, default: "0")</span>
Authentication athenzAuth = AuthenticationFactory
.create(AuthenticationAthenz<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>(), <span class="hljs-title">authParams</span>)</span>;
PulsarClient client = PulsarClient.builder()
.serviceUrl(<span class="hljs-string">"pulsar+ssl://my-broker.com:6651"</span>)
.enableTls(<span class="hljs-keyword">true</span>)
.tlsTrustCertsFilePath(<span class="hljs-string">"/path/to/cacert.pem"</span>)
.authentication(athenzAuth)
.build();
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="supported-pattern-formats"></a><a href="#supported-pattern-formats" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Supported pattern formats</h4>
<p>The <code>privateKey</code> parameter supports the following three pattern formats:</p>
<ul>
<li><code>file:///path/to/file</code></li>
<li><code>file:/path/to/file</code></li>
<li><code>data:application/x-pem-file;base64,&lt;base64-encoded value&gt;</code></li>
</ul>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="oauth2"></a><a href="#oauth2" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Oauth2</h3>
<p>The following example shows how to use <a href="/docs/en/next/security-oauth2">Oauth2</a> as an authentication provider for the Pulsar Java client.</p>
<p>You can use the factory method to configure authentication for Pulsar Java client.</p>
<pre><code class="hljs css language-java">PulsarClient client = PulsarClient.builder()
.serviceUrl(<span class="hljs-string">"pulsar://broker.example.com:6650/"</span>)
.authentication(
AuthenticationFactoryOAuth2.clientCredentials(<span class="hljs-keyword">this</span>.issuerUrl, <span class="hljs-keyword">this</span>.credentialsUrl, <span class="hljs-keyword">this</span>.audience))
.build();
</code></pre>
<p>In addition, you can also use the encoded parameters to configure authentication for Pulsar Java client.</p>
<pre><code class="hljs css language-java">Authentication auth = AuthenticationFactory
.create(AuthenticationOAuth2.class.getName(), "{"type":"client_credentials","privateKey":"...","issuerUrl":"...","audience":"..."}");
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://broker.example.com:6650/")
.authentication(auth)
.build();
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/next/client-libraries"><span class="arrow-prev"></span><span>Overview</span></a><a class="docs-next button" href="/docs/en/next/client-libraries-go"><span>Go</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#installation">Installation</a><ul class="toc-headings"><li><a href="#maven">Maven</a></li><li><a href="#gradle">Gradle</a></li></ul></li><li><a href="#connection-urls">Connection URLs</a></li><li><a href="#client">Client</a><ul class="toc-headings"><li><a href="#default-broker-urls-for-standalone-clusters">Default broker URLs for standalone clusters</a></li><li><a href="#client-memory-allocator-configuration">Client memory allocator configuration</a></li><li><a href="#cluster-level-failover">Cluster-level failover</a></li></ul></li><li><a href="#producer">Producer</a><ul class="toc-headings"><li><a href="#configure-producer">Configure producer</a></li><li><a href="#message-routing">Message routing</a></li><li><a href="#async-send">Async send</a></li><li><a href="#configure-messages">Configure messages</a></li><li><a href="#enable-chunking">Enable chunking</a></li></ul></li><li><a href="#consumer">Consumer</a><ul class="toc-headings"><li><a href="#configure-consumer">Configure consumer</a></li><li><a href="#async-receive">Async receive</a></li><li><a href="#batch-receive">Batch receive</a></li><li><a href="#configure-chunking">Configure chunking</a></li><li><a href="#negative-acknowledgment-redelivery-backoff">Negative acknowledgment redelivery backoff</a></li><li><a href="#acknowledgement-timeout-redelivery-backoff">Acknowledgement timeout redelivery backoff</a></li><li><a href="#multi-topic-subscriptions">Multi-topic subscriptions</a></li><li><a href="#subscription-types">Subscription types</a></li></ul></li><li><a href="#reader">Reader</a><ul class="toc-headings"><li><a href="#configure-reader">Configure reader</a></li><li><a href="#sticky-key-range-reader">Sticky key range reader</a></li><li><a href="#configure-chunking-1">Configure chunking</a></li><li><a href="#create-reader-with-interceptor">Create reader with interceptor</a></li></ul></li><li><a href="#tableview">TableView</a><ul class="toc-headings"><li><a href="#configure-tableview">Configure TableView</a></li><li><a href="#register-listeners">Register listeners</a></li></ul></li><li><a href="#schema">Schema</a><ul class="toc-headings"><li><a href="#avrobasestructschema-example">AvroBaseStructSchema example</a></li><li><a href="#protobufnativeschema-example">ProtobufNativeSchema example</a></li></ul></li><li><a href="#authentication">Authentication</a><ul class="toc-headings"><li><a href="#tls-authentication">TLS Authentication</a></li><li><a href="#athenz">Athenz</a></li><li><a href="#oauth2">Oauth2</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>