blob: ce67fabd1f66d7df1e7c3e3abc48dc5b8e463daf [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>The Pulsar Java client · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="The Pulsar Java client can be used both to create Java producers, consumers, and [readers](#reader-interface) of messages and to perform [administrative tasks](/docs/en/2.4.2/admin-api-overview). The current version of the Java client is **2.4.2**."/><meta name="docsearch:version" content="2.4.2"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="The Pulsar Java client · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="The Pulsar Java client can be used both to create Java producers, consumers, and [readers](#reader-interface) of messages and to perform [administrative tasks](/docs/en/2.4.2/admin-api-overview). The current version of the Java client is **2.4.2**."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.4.2</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.4.2/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.4.2/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.4.2/client-libraries-java">日本語</a></li><li><a href="/docs/fr/2.4.2/client-libraries-java">Français</a></li><li><a href="/docs/ko/2.4.2/client-libraries-java">한국어</a></li><li><a href="/docs/zh-CN/2.4.2/client-libraries-java">中文</a></li><li><a href="/docs/zh-TW/2.4.2/client-libraries-java">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Client Libraries</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/client-libraries">Use Pulsar with client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-schema-registry">Schema Registry</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-managing">Managing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-connectors">Builtin Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-develop">Developing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-cdc">CDC Connector</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/sql-getting-started">Get Started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/sql-deployment-configurations">Deployment and Configuration</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-dashboard">Dashboard</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-upgrade">Upgrade</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-token-client">Client Authentication using tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-token-admin">Token authentication admin</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-extending">Extending</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.4.2/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/client-libraries-websocket">WebSocket</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-schemas">Schemas</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-functions">Functions</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/reference-connector-admin">Connector Admin CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/client-libraries-java.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">The Pulsar Java client</h1></header><article><div><span><p>The Pulsar Java client can be used both to create Java producers, consumers, and <a href="#reader-interface">readers</a> of messages and to perform <a href="/docs/en/2.4.2/admin-api-overview">administrative tasks</a>. The current version of the Java client is <strong>2.4.2</strong>.</p>
<p>Javadoc for the Pulsar client is divided up into two domains, by package:</p>
<table>
<thead>
<tr><th style="text-align:left">Package</th><th style="text-align:left">Description</th><th style="text-align:left">Maven Artifact</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="/api/client/2.4.2"><code>org.apache.pulsar.client.api</code></a></td><td style="text-align:left">The producer and consumer API</td><td style="text-align:left"><a href="http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client%7C2.4.2%7Cjar">org.apache.pulsar:pulsar-client:2.4.2</a></td></tr>
<tr><td style="text-align:left"><a href="/api/admin/2.4.2"><code>org.apache.pulsar.client.admin</code></a></td><td style="text-align:left">The Java <a href="/docs/en/2.4.2/admin-api-overview">admin API</a></td><td style="text-align:left"><a href="http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client-admin%7C2.4.2%7Cjar">org.apache.pulsar:pulsar-client-admin:2.4.2</a></td></tr>
</tbody>
</table>
<p>This document will focus only on the client API for producing and consuming messages on Pulsar topics. For a guide to using the Java admin client, see <a href="/docs/en/2.4.2/admin-api-overview">The Pulsar admin interface</a>.</p>
<h2><a class="anchor" aria-hidden="true" id="installation"></a><a href="#installation" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Installation</h2>
<p>The latest version of the Pulsar Java client library is available via <a href="http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client%7C2.4.2%7Cjar">Maven Central</a>. To use the latest version, add the <code>pulsar-client</code> library to your build configuration.</p>
<h3><a class="anchor" aria-hidden="true" id="maven"></a><a href="#maven" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Maven</h3>
<p>If you're using Maven, add this to your <code>pom.xml</code>:</p>
<pre><code class="hljs css language-xml"><span class="hljs-comment">&lt;!-- in your &lt;properties&gt; block --&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">pulsar.version</span>&gt;</span>2.4.2<span class="hljs-tag">&lt;/<span class="hljs-name">pulsar.version</span>&gt;</span>
<span class="hljs-comment">&lt;!-- in your &lt;dependencies&gt; block --&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.pulsar<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>pulsar-client<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>${pulsar.version}<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="gradle"></a><a href="#gradle" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Gradle</h3>
<p>If you're using Gradle, add this to your <code>build.gradle</code> file:</p>
<pre><code class="hljs css language-groovy"><span class="hljs-keyword">def</span> pulsarVersion = <span class="hljs-string">'2.4.2'</span>
dependencies {
compile <span class="hljs-string">group:</span> <span class="hljs-string">'org.apache.pulsar'</span>, <span class="hljs-string">name:</span> <span class="hljs-string">'pulsar-client'</span>, <span class="hljs-string">version:</span> pulsarVersion
}
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="connection-urls"></a><a href="#connection-urls" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Connection URLs</h2>
<p>To connect to Pulsar using client libraries, you need to specify a <a href="/docs/en/2.4.2/developing-binary-protocol">Pulsar protocol</a> URL.</p>
<p>Pulsar protocol URLs are assigned to specific clusters, use the <code>pulsar</code> scheme and have a default port of 6650. Here's an example for <code>localhost</code>:</p>
<pre><code class="hljs css language-http">pulsar://localhost:6650
</code></pre>
<p>If you have more than one broker, the URL may look like this:</p>
<pre><code class="hljs css language-http">pulsar://localhost:6550,localhost:6651,localhost:6652
</code></pre>
<p>A URL for a production Pulsar cluster may look something like this:</p>
<pre><code class="hljs css language-http">pulsar://pulsar.us-west.example.com:6650
</code></pre>
<p>If you're using <a href="/docs/en/2.4.2/security-tls-authentication">TLS</a> authentication, the URL will look like something like this:</p>
<pre><code class="hljs css language-http">pulsar+ssl://pulsar.us-west.example.com:6651
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="client-configuration"></a><a href="#client-configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Client configuration</h2>
<p>You can instantiate a <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
object using just a URL for the target Pulsar <a href="/docs/en/2.4.2/reference-terminology#cluster">cluster</a>, like this:</p>
<pre><code class="hljs css language-java">PulsarClient client = PulsarClient.builder()
.serviceUrl(<span class="hljs-string">"pulsar://localhost:6650"</span>)
.build();
</code></pre>
<p>If you have multiple brokers, you can initiate a PulsarClient like this:</p>
<pre><code class="hljs css language-java">PulsarClient client = PulsarClient.builder()
.serviceUrl(<span class="hljs-string">"pulsar://localhost:6650,localhost:6651,localhost:6652"</span>)
.build();
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="default-broker-urls-for-standalone-clusters"></a><a href="#default-broker-urls-for-standalone-clusters" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Default broker URLs for standalone clusters</h4>
<p>If you're running a cluster in <a href="/docs/en/2.4.2/getting-started-standalone">standalone mode</a>, the broker will be available at the <code>pulsar://localhost:6650</code> URL by default.</p>
</blockquote>
<p>If you create a client, you may use the <code>loadConf</code> configuration. Below are the available parameters used in <code>loadConf</code>.</p>
<table>
<thead>
<tr><th>Type</th><th>Name</th><th>Description</th><th>Default</th></tr>
</thead>
<tbody>
<tr><td>String</td><td><code>serviceUrl</code></td><td>Service URL provider for Pulsar service</td><td>None</td></tr>
<tr><td>String</td><td><code>authPluginClassName</code></td><td>Name of the authentication plugin</td><td>None</td></tr>
<tr><td>String</td><td><code>authParams</code></td><td>String represents parameters for the authentication plugin <br><strong>Example</strong><br> key1:val1,key2:val2</td><td>None</td></tr>
<tr><td>long</td><td><code>operationTimeoutMs</code></td><td>Operation timeout</td><td>30000</td></tr>
<tr><td>long</td><td><code>statsIntervalSeconds</code></td><td>Interval between each stat info<br>Stats is activated with positive <code>statsInterval</code><br><code>statsIntervalSeconds</code> should be set to 1 second at least</td><td>60</td></tr>
<tr><td>int</td><td><code>numIoThreads</code></td><td>Number of threads used for handling connections to brokers</td><td>1</td></tr>
<tr><td>int</td><td><code>numListenerThreads</code></td><td>Number of threads used for handling message listeners</td><td>1</td></tr>
<tr><td>boolean</td><td><code>useTcpNoDelay</code></td><td>Whether to use TCP no-delay flag on the connection to disable Nagle algorithm</td><td>true</td></tr>
<tr><td>boolean</td><td><code>useTls</code></td><td>Whether to use TLS encryption on the connection</td><td>false</td></tr>
<tr><td>string</td><td><code>tlsTrustCertsFilePath</code></td><td>Path to the trusted TLS certificate file</td><td>None</td></tr>
<tr><td>boolean</td><td><code>tlsAllowInsecureConnection</code></td><td>Whether the Pulsar client accepts untrusted TLS certificate from broker</td><td>false</td></tr>
<tr><td>boolean</td><td><code>tlsHostnameVerificationEnable</code></td><td>Whether to enable TLS hostname verification</td><td>false</td></tr>
<tr><td>int</td><td><code>concurrentLookupRequest</code></td><td>Number of concurrent lookup requests allowed to send on each broker connection to prevent overload on broker</td><td>5000</td></tr>
<tr><td>int</td><td><code>maxLookupRequest</code></td><td>Maximum number of lookup requests allowed on each broker connection to prevent overload on broker</td><td>50000</td></tr>
<tr><td>int</td><td><code>maxNumberOfRejectedRequestPerConnection</code></td><td>Maximum number of rejected requests of a broker in a certain time frame (30 seconds) after the current connection is closed and the client creates a new connection to connect to a different broker</td><td>50</td></tr>
<tr><td>int</td><td><code>keepAliveIntervalSeconds</code></td><td>Seconds of keeping alive interval for each client broker connection</td><td>30</td></tr>
<tr><td>int</td><td><code>connectionTimeoutMs</code></td><td>Duration of waiting for a connection to a broker to be established <br>If the duration passes without a response from a broker, the connection attempt is dropped</td><td>10000</td></tr>
<tr><td>int</td><td><code>requestTimeoutMs</code></td><td>Maximum duration for completing a request</td><td>60000</td></tr>
<tr><td>int</td><td><code>defaultBackoffIntervalNanos</code></td><td>Default duration for a backoff interval</td><td>TimeUnit.MILLISECONDS.toNanos(100);</td></tr>
<tr><td>long</td><td><code>maxBackoffIntervalNanos</code></td><td>Maximum duration for a backoff interval</td><td>TimeUnit.SECONDS.toNanos(30)</td></tr>
</tbody>
</table>
<p>Check out the Javadoc for the <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
class for a full listing of configurable parameters.</p>
<blockquote>
<p>In addition to client-level configuration, you can also apply <a href="#configuring-producers">producer</a> and <a href="#configuring-consumers">consumer</a> specific configuration, as you'll see in the sections below.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="producers"></a><a href="#producers" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producers</h2>
<p>In Pulsar, producers write messages to topics. Once you've instantiated a <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
object (as in the section <a href="#client-configuration">above</a>), you can create a <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/Producer">Producer</a>
for a specific Pulsar <a href="/docs/en/2.4.2/reference-terminology#topic">topic</a>.</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; producer = client.newProducer()
.topic(<span class="hljs-string">"my-topic"</span>)
.create();
<span class="hljs-comment">// You can then send messages to the broker and topic you specified:</span>
producer.send(<span class="hljs-string">"My message"</span>.getBytes());
</code></pre>
<p>By default, producers produce messages that consist of byte arrays. You can produce different types, however, by specifying a message <a href="#schemas">schema</a>.</p>
<pre><code class="hljs css language-java">Producer&lt;String&gt; stringProducer = client.newProducer(Schema.STRING)
.topic(<span class="hljs-string">"my-topic"</span>)
.create();
stringProducer.send(<span class="hljs-string">"My message"</span>);
</code></pre>
<blockquote>
<p>You should always make sure to close your producers, consumers, and clients when they are no longer needed:</p>
<pre><code class="hljs css language-java">producer.close();
consumer.close();
client.close();
</code></pre>
<p>Close operations can also be asynchronous:</p>
<pre><code class="hljs css language-java">producer.closeAsync()
.thenRun(() -&gt; System.out.println(<span class="hljs-string">"Producer closed"</span>));
.exceptionally((ex) -&gt; {
System.err.println(<span class="hljs-string">"Failed to close producer: "</span> + ex);
<span class="hljs-keyword">return</span> ex;
});
</code></pre>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="configuring-producers"></a><a href="#configuring-producers" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuring producers</h3>
<p>If you instantiate a <code>Producer</code> object specifying only a topic name, as in the example above, the producer will use the default configuration. To use a non-default configuration, there's a variety of configurable parameters that you can set.</p>
<p>For a full listing, see the Javadoc for the <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/ProducerBuilder">ProducerBuilder</a>
class. Here's an example:</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; producer = client.newProducer()
.topic(<span class="hljs-string">"my-topic"</span>)
.batchingMaxPublishDelay(<span class="hljs-number">10</span>, TimeUnit.MILLISECONDS)
.sendTimeout(<span class="hljs-number">10</span>, TimeUnit.SECONDS)
.blockIfQueueFull(<span class="hljs-keyword">true</span>)
.create();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="message-routing"></a><a href="#message-routing" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Message routing</h3>
<p>When using partitioned topics, you can specify the routing mode whenever you publish messages using a producer. For more on specifying a routing mode using the Java client, see the <a href="/docs/en/2.4.2/cookbooks-partitioned">Partitioned Topics</a> cookbook.</p>
<h3><a class="anchor" aria-hidden="true" id="async-send"></a><a href="#async-send" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Async send</h3>
<p>You can also publish messages <a href="/docs/en/2.4.2/concepts-messaging#send-modes">asynchronously</a> using the Java client. With async send, the producer will put the message in a blocking queue and return immediately. The client library will then send the message to the broker in the background. If the queue is full (max size configurable), the producer could be blocked or fail immediately when calling the API, depending on arguments passed to the producer.</p>
<p>Here's an example async send operation:</p>
<pre><code class="hljs css language-java">producer.sendAsync(<span class="hljs-string">"my-async-message"</span>.getBytes()).thenAccept(msgId -&gt; {
System.out.printf(<span class="hljs-string">"Message with ID %s successfully sent"</span>, msgId);
});
</code></pre>
<p>As you can see from the example above, async send operations return a <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/MessageId">MessageId</a>
wrapped in a <a href="http://www.baeldung.com/java-completablefuture"><code>CompletableFuture</code></a>.</p>
<h3><a class="anchor" aria-hidden="true" id="configuring-messages"></a><a href="#configuring-messages" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuring messages</h3>
<p>In addition to a value, it's possible to set additional items on a given message:</p>
<pre><code class="hljs css language-java">producer.newMessage()
.key(<span class="hljs-string">"my-message-key"</span>)
.value(<span class="hljs-string">"my-async-message"</span>.getBytes())
.property(<span class="hljs-string">"my-key"</span>, <span class="hljs-string">"my-value"</span>)
.property(<span class="hljs-string">"my-other-key"</span>, <span class="hljs-string">"my-other-value"</span>)
.send();
</code></pre>
<p>As for the previous case, it's also possible to terminate the builder chain with <code>sendAsync()</code> and
get a future returned.</p>
<h2><a class="anchor" aria-hidden="true" id="consumers"></a><a href="#consumers" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumers</h2>
<p>In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new <a href="/docs/en/2.4.2/reference-terminology#consumer">consumer</a> by first instantiating a <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
object and passing it a URL for a Pulsar broker (as <a href="#client-configuration">above</a>).</p>
<p>Once you've instantiated a <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
object, you can create a <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/Consumer">Consumer</a>
by specifying a <a href="/docs/en/2.4.2/reference-terminology#topic">topic</a> and a <a href="/docs/en/2.4.2/concepts-messaging#subscription-modes">subscription</a>.</p>
<pre><code class="hljs css language-java">Consumer consumer = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscribe();
</code></pre>
<p>The <code>subscribe</code> method will automatically subscribe the consumer to the specified topic and subscription. One way to make the consumer listen on the topic is to set up a <code>while</code> loop. In this example loop, the consumer listens for messages, prints the contents of any message that's received, and then <a href="/docs/en/2.4.2/reference-terminology#acknowledgment-ack">acknowledges</a> that the message has been processed. If the processing logic fails, we use <a href="/docs/en/2.4.2/reference-terminology#acknowledgment-ack">negative acknowledgement</a>
to have the message redelivered at a later point in time.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
<span class="hljs-comment">// Wait for a message</span>
Message msg = consumer.receive();
<span class="hljs-keyword">try</span> {
<span class="hljs-comment">// Do something with the message</span>
System.out.printf(<span class="hljs-string">"Message received: %s"</span>, <span class="hljs-keyword">new</span> String(msg.getData()));
<span class="hljs-comment">// Acknowledge the message so that it can be deleted by the message broker</span>
consumer.acknowledge(msg);
} <span class="hljs-keyword">catch</span> (Exception e) {
<span class="hljs-comment">// Message failed to process, redeliver later</span>
consumer.negativeAcknowledge(msg);
}
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="configuring-consumers"></a><a href="#configuring-consumers" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuring consumers</h3>
<p>If you instantiate a <code>Consumer</code> object specifying only a topic and subscription name, as in the example above, the consumer will use the default configuration. To use a non-default configuration, there's a variety of configurable parameters that you can set. For a full listing, see the Javadoc for the <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/ConsumerBuilder">ConsumerBuilder</a>
class. Here's an example:</p>
<p>Here's an example configuration:</p>
<pre><code class="hljs css language-java">Consumer consumer = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.ackTimeout(<span class="hljs-number">10</span>, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="async-receive"></a><a href="#async-receive" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Async receive</h3>
<p>The <code>receive</code> method will receive messages synchronously (the consumer process will be blocked until a message is available). You can also use <a href="/docs/en/2.4.2/concepts-messaging#receive-modes">async receive</a>, which will return immediately with a <a href="http://www.baeldung.com/java-completablefuture"><code>CompletableFuture</code></a> object that completes once a new message is available.</p>
<p>Here's an example:</p>
<pre><code class="hljs css language-java">CompletableFuture&lt;Message&gt; asyncMessage = consumer.receiveAsync();
</code></pre>
<p>Async receive operations return a <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/Message">Message</a>
wrapped inside of a <a href="http://www.baeldung.com/java-completablefuture"><code>CompletableFuture</code></a>.</p>
<h3><a class="anchor" aria-hidden="true" id="multi-topic-subscriptions"></a><a href="#multi-topic-subscriptions" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Multi-topic subscriptions</h3>
<p>In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously using <a href="/docs/en/2.4.2/concepts-messaging#multi-topic-subscriptions">multi-topic subscriptions</a>. To use multi-topic subscriptions you can supply either a regular expression (regex) or a <code>List</code> of topics. If you select topics via regex, all topics must be within the same Pulsar namespace.</p>
<p>Here are some examples:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.client.api.Consumer;
<span class="hljs-keyword">import</span> org.apache.pulsar.client.api.PulsarClient;
<span class="hljs-keyword">import</span> java.util.Arrays;
<span class="hljs-keyword">import</span> java.util.List;
<span class="hljs-keyword">import</span> java.util.regex.Pattern;
ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
.subscriptionName(subscription);
<span class="hljs-comment">// Subscribe to all topics in a namespace</span>
Pattern allTopicsInNamespace = Pattern.compile(<span class="hljs-string">"persistent://public/default/.*"</span>);
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.subscribe();
<span class="hljs-comment">// Subscribe to a subsets of topics in a namespace, based on regex</span>
Pattern someTopicsInNamespace = Pattern.compile(<span class="hljs-string">"persistent://public/default/foo.*"</span>);
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(someTopicsInNamespace)
.subscribe();
</code></pre>
<p>You can also subscribe to an explicit list of topics (across namespaces if you wish):</p>
<pre><code class="hljs css language-java">List&lt;String&gt; topics = Arrays.asList(
<span class="hljs-string">"topic-1"</span>,
<span class="hljs-string">"topic-2"</span>,
<span class="hljs-string">"topic-3"</span>
);
Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.subscribe();
<span class="hljs-comment">// Alternatively:</span>
Consumer multiTopicConsumer = consumerBuilder
.topics(
<span class="hljs-string">"topic-1"</span>,
<span class="hljs-string">"topic-2"</span>,
<span class="hljs-string">"topic-3"</span>
)
.subscribe();
</code></pre>
<p>You can also subscribe to multiple topics asynchronously using the <code>subscribeAsync</code> method rather than the synchronous <code>subscribe</code> method. Here's an example:</p>
<pre><code class="hljs css language-java">Pattern allTopicsInNamespace = Pattern.compile(<span class="hljs-string">"persistent://public/default.*"</span>);
consumerBuilder
.topics(topics)
.subscribeAsync()
.thenAccept(<span class="hljs-keyword">this</span>::receiveMessageFromConsumer);
<span class="hljs-function"><span class="hljs-keyword">private</span> <span class="hljs-keyword">void</span> <span class="hljs-title">receiveMessageFromConsumer</span><span class="hljs-params">(Consumer consumer)</span> </span>{
consumer.receiveAsync().thenAccept(message -&gt; {
<span class="hljs-comment">// Do something with the received message</span>
receiveMessageFromConsumer(consumer);
});
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="subscription-types"></a><a href="#subscription-types" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Subscription types</h3>
<p>Pulsar has various <a href="concepts-messaging#subscription-types">subscription types</a> to match different scenarios. A topic can have multiple subscriptions with different subscription types. However, a subscription can only have one subscription type at a time.</p>
<p>A subscription is identified with the subscription name; a subscription name can specify only one subscription type at a time. To change the subscription type, you should first stop all consumers of this subscription.</p>
<p>Different subscription types have different message distribution modes. This section describes the differences of subscription types and how to use them.</p>
<p>In order to better describe their differences, assuming you have a topic named &quot;my-topic&quot;, and the producer has published 10 messages.</p>
<pre><code class="hljs css language-java">Producer&lt;String&gt; producer = client.newProducer(Schema.STRING)
.topic(<span class="hljs-string">"my-topic"</span>)
.enableBatching(<span class="hljs-keyword">false</span>)
.create();
<span class="hljs-comment">// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"</span>
producer.newMessage().key(<span class="hljs-string">"key-1"</span>).value(<span class="hljs-string">"message-1-1"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-1"</span>).value(<span class="hljs-string">"message-1-2"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-1"</span>).value(<span class="hljs-string">"message-1-3"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-2"</span>).value(<span class="hljs-string">"message-2-1"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-2"</span>).value(<span class="hljs-string">"message-2-2"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-2"</span>).value(<span class="hljs-string">"message-2-3"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-3"</span>).value(<span class="hljs-string">"message-3-1"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-3"</span>).value(<span class="hljs-string">"message-3-2"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-4"</span>).value(<span class="hljs-string">"message-4-1"</span>).send();
producer.newMessage().key(<span class="hljs-string">"key-4"</span>).value(<span class="hljs-string">"message-4-2"</span>).send();
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="exclusive"></a><a href="#exclusive" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Exclusive</h4>
<p>Create a new consumer and subscribe with the <code>Exclusive</code> subscription type.</p>
<pre><code class="hljs css language-java">Consumer consumer = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe()
</code></pre>
<p>Only the first consumer is allowed to the subscription, other consumers receive an error. The first consumer receives all 10 messages, and the consuming order is the same as the producing order.</p>
<blockquote>
<p><strong>Note</strong> <br>
If topic is a partitioned topic, the first consumer subscribes to all partitioned topics, other consumers are not assigned with partitions and receive an error.</p>
</blockquote>
<h4><a class="anchor" aria-hidden="true" id="failover"></a><a href="#failover" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Failover</h4>
<p>Create new consumers and subscribe with the<code>Failover</code> subscription type.</p>
<pre><code class="hljs css language-java">Consumer consumer1 = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Failover)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Failover)
.subscribe()
<span class="hljs-comment">//conumser1 is the active consumer, consumer2 is the standby consumer.</span>
<span class="hljs-comment">//consumer1 receives 5 messages and then crashes, consumer2 takes over as an active consumer.</span>
</code></pre>
<p>Multiple consumers can attach to the same subscription, yet only the first consumer is active, and others are standby. When the active consumer is disconnected, messages will be dispatched to one of standby consumers, and the standby consumer becomes active consumer.</p>
<p>If the first active consumer receives 5 messages and is disconnected, the standby consumer becomes active consumer. Consumer1 will receive:</p>
<pre><code class="hljs">(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-1"</span>)
(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-2"</span>)
(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-3"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-1"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-2"</span>)
</code></pre>
<p>consumer2 will receive:</p>
<pre><code class="hljs">(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-3"</span>)
(<span class="hljs-string">"key-3"</span>, <span class="hljs-string">"message-3-1"</span>)
(<span class="hljs-string">"key-3"</span>, <span class="hljs-string">"message-3-2"</span>)
(<span class="hljs-string">"key-4"</span>, <span class="hljs-string">"message-4-1"</span>)
(<span class="hljs-string">"key-4"</span>, <span class="hljs-string">"message-4-2"</span>)
</code></pre>
<blockquote>
<p><strong>Note</strong> <br>
If a topic is a partitioned topic, each partition only has one active consumer, messages of one partition only distributed to one consumer, messages of multiple partitions are distributed to multiple consumers.</p>
</blockquote>
<h4><a class="anchor" aria-hidden="true" id="shared"></a><a href="#shared" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Shared</h4>
<p>Create new consumers and subscribe with <code>Shared</code> subscription type.</p>
<pre><code class="hljs css language-java">Consumer consumer1 = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Shared)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Shared)
.subscribe()
<span class="hljs-comment">//Both consumer1 and consumer 2 is active consumers.</span>
</code></pre>
<p>In shared subscription type, multiple consumers can attach to the same subscription and message are delivered in a round robin distribution across consumers.</p>
<p>If a broker dispatches only one message at a time, consumer1 will receive:</p>
<pre><code class="hljs">(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-1"</span>)
(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-3"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-2"</span>)
(<span class="hljs-string">"key-3"</span>, <span class="hljs-string">"message-3-1"</span>)
(<span class="hljs-string">"key-4"</span>, <span class="hljs-string">"message-4-1"</span>)
</code></pre>
<p>consumer 2 will receive:</p>
<pre><code class="hljs">(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-2"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-1"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-3"</span>)
(<span class="hljs-string">"key-3"</span>, <span class="hljs-string">"message-3-2"</span>)
(<span class="hljs-string">"key-4"</span>, <span class="hljs-string">"message-4-2"</span>)
</code></pre>
<p><code>Shared</code> subscription is different from <code>Exclusive</code> and <code>Failover</code> subscription types. <code>Shared</code> subscription has better flexibility, but cannot provide order guarantee.</p>
<h4><a class="anchor" aria-hidden="true" id="key_shared"></a><a href="#key_shared" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Key_Shared</h4>
<p>This is a new subscription type since 2.4.0 release, create new consumers and subscribe with <code>Key_Shared</code> subscription type:</p>
<pre><code class="hljs css language-java">Consumer consumer1 = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
Consumer consumer2 = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe()
<span class="hljs-comment">//Both consumer1 and consumer2 are active consumers.</span>
</code></pre>
<p><code>Key_Shared</code> subscription is like <code>Shared</code> subscription, all consumers can attach to the same subscription. But it is different from <code>Key_Shared</code> subscription, messages with the same key are delivered to only one consumer in order. The possible distribution of messages between different consumers(by default we do not know in advance which keys will be assigned to a consumer, but a key will only be assigned to a consumer at the same time. ) .</p>
<p>consumer1 will receive:</p>
<pre><code class="hljs">(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-1"</span>)
(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-2"</span>)
(<span class="hljs-string">"key-1"</span>, <span class="hljs-string">"message-1-3"</span>)
(<span class="hljs-string">"key-3"</span>, <span class="hljs-string">"message-3-1"</span>)
(<span class="hljs-string">"key-3"</span>, <span class="hljs-string">"message-3-2"</span>)
</code></pre>
<p>consumer 2 will receive:</p>
<pre><code class="hljs">(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-1"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-2"</span>)
(<span class="hljs-string">"key-2"</span>, <span class="hljs-string">"message-2-3"</span>)
(<span class="hljs-string">"key-4"</span>, <span class="hljs-string">"message-4-1"</span>)
(<span class="hljs-string">"key-4"</span>, <span class="hljs-string">"message-4-2"</span>)
</code></pre>
<blockquote>
<p><strong>Note</strong> <br>
If the message key is not specified, messages without key will be dispatched to one consumer in order by default.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="reader-interface"></a><a href="#reader-interface" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reader interface</h2>
<p>With the <a href="/docs/en/2.4.2/concepts-clients#reader-interface">reader interface</a>, Pulsar clients can &quot;manually position&quot; themselves within a topic, reading all messages from a specified message onward. The Pulsar API for Java enables you to create <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/Reader">Reader</a>
objects by specifying a topic and a <a href="https://pulsar.apache.org/api/client/2.4.2/org/apache/pulsar/client/api/MessageId">MessageId</a>
.</p>
<p>Here's an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">byte</span>[] msgIdBytes = <span class="hljs-comment">// Some message ID byte array</span>
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();
<span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
Message message = reader.readNext();
<span class="hljs-comment">// Process message</span>
}
</code></pre>
<p>In the example above, a <code>Reader</code> object is instantiated for a specific topic and message (by ID); the reader then iterates over each message in the topic after the message identified by <code>msgIdBytes</code> (how that value is obtained depends on the application).</p>
<p>The code sample above shows pointing the <code>Reader</code> object to a specific message (by ID), but you can also use <code>MessageId.earliest</code> to point to the earliest available message on the topic of <code>MessageId.latest</code> to point to the most recent available message.</p>
<h2><a class="anchor" aria-hidden="true" id="schemas"></a><a href="#schemas" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Schemas</h2>
<p>In Pulsar, all message data consists of byte arrays &quot;under the hood.&quot; <a href="/docs/en/2.4.2/schema-get-started">Message schemas</a> enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). If you construct, say, a <a href="#producers">producer</a> without specifying a schema, then the producer can only produce messages of type <code>byte[]</code>. Here's an example:</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; producer = client.newProducer()
.topic(topic)
.create();
</code></pre>
<p>The producer above is equivalent to a <code>Producer&lt;byte[]&gt;</code> (in fact, you should <em>always</em> explicitly specify the type). If you'd like to use a producer for a different type of data, you'll need to specify a <strong>schema</strong> that informs Pulsar which data type will be transmitted over the <a href="/docs/en/2.4.2/reference-terminology#topic">topic</a>.</p>
<h3><a class="anchor" aria-hidden="true" id="schema-example"></a><a href="#schema-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Schema example</h3>
<p>Let's say that you have a <code>SensorReading</code> class that you'd like to transmit over a Pulsar topic:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">SensorReading</span> </span>{
<span class="hljs-keyword">public</span> <span class="hljs-keyword">float</span> temperature;
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">SensorReading</span><span class="hljs-params">(<span class="hljs-keyword">float</span> temperature)</span> </span>{
<span class="hljs-keyword">this</span>.temperature = temperature;
}
<span class="hljs-comment">// A no-arg constructor is required</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">SensorReading</span><span class="hljs-params">()</span> </span>{
}
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">float</span> <span class="hljs-title">getTemperature</span><span class="hljs-params">()</span> </span>{
<span class="hljs-keyword">return</span> temperature;
}
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">setTemperature</span><span class="hljs-params">(<span class="hljs-keyword">float</span> temperature)</span> </span>{
<span class="hljs-keyword">this</span>.temperature = temperature;
}
}
</code></pre>
<p>You could then create a <code>Producer&lt;SensorReading&gt;</code> (or <code>Consumer&lt;SensorReading&gt;</code>) like so:</p>
<pre><code class="hljs css language-java">Producer&lt;SensorReading&gt; producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-readings")
.create();
</code></pre>
<p>The following schema formats are currently available for Java:</p>
<ul>
<li><p>No schema or the byte array schema (which can be applied using <code>Schema.BYTES</code>):</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; bytesProducer = client.newProducer(Schema.BYTES)
.topic(<span class="hljs-string">"some-raw-bytes-topic"</span>)
.create();
</code></pre>
<p>Or, equivalently:</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; bytesProducer = client.newProducer()
.topic(<span class="hljs-string">"some-raw-bytes-topic"</span>)
.create();
</code></pre></li>
<li><p><code>String</code> for normal UTF-8-encoded string data. This schema can be applied using <code>Schema.STRING</code>:</p>
<pre><code class="hljs css language-java">Producer&lt;String&gt; stringProducer = client.newProducer(Schema.STRING)
.topic(<span class="hljs-string">"some-string-topic"</span>)
.create();
</code></pre></li>
<li><p>JSON schemas can be created for POJOs using <code>Schema.JSON</code>. Here's an example:</p>
<pre><code class="hljs css language-java">Producer&lt;MyPojo&gt; pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))
.topic("some-pojo-topic")
.create();
</code></pre></li>
<li><p>Protobuf schemas can be generate using <code>Schema.PROTOBUF</code>. The following example shows how to create the Protobuf schema and use it to instantiate a new producer:</p>
<pre><code class="hljs css language-java">Producer&lt;MyProtobuf&gt; protobufProducer = client.newProducer(Schema.PROTOBUF(MyProtobuf.class))
.topic("some-protobuf-topic")
.create();
</code></pre></li>
<li><p>Avro schemas can be defined with the help of <code>Schema.AVRO</code>. The next code snippet demonstrates the creation and usage of the Avro schema:</p>
<pre><code class="hljs css language-java">Producer&lt;MyAvro&gt; avroProducer = client.newProducer(Schema.AVRO(MyAvro.class))
.topic("some-avro-topic")
.create();
</code></pre></li>
</ul>
<h2><a class="anchor" aria-hidden="true" id="authentication"></a><a href="#authentication" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Authentication</h2>
<p>Pulsar currently supports two authentication schemes: <a href="/docs/en/2.4.2/security-tls-authentication">TLS</a> and <a href="/docs/en/2.4.2/security-athenz">Athenz</a>. The Pulsar Java client can be used with both.</p>
<h3><a class="anchor" aria-hidden="true" id="tls-authentication"></a><a href="#tls-authentication" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>TLS Authentication</h3>
<p>To use <a href="/docs/en/2.4.2/security-tls-authentication">TLS</a>, you need to set TLS to <code>true</code> using the <code>setUseTls</code> method, point your Pulsar client to a TLS cert path, and provide paths to cert and key files.</p>
<p>Here's an example configuration:</p>
<pre><code class="hljs css language-java">Map&lt;String, String&gt; authParams = <span class="hljs-keyword">new</span> HashMap&lt;&gt;();
authParams.put(<span class="hljs-string">"tlsCertFile"</span>, <span class="hljs-string">"/path/to/client-cert.pem"</span>);
authParams.put(<span class="hljs-string">"tlsKeyFile"</span>, <span class="hljs-string">"/path/to/client-key.pem"</span>);
Authentication tlsAuth = AuthenticationFactory
.create(AuthenticationTls<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>(), <span class="hljs-title">authParams</span>)</span>;
PulsarClient client = PulsarClient.builder()
.serviceUrl(<span class="hljs-string">"pulsar+ssl://my-broker.com:6651"</span>)
.enableTls(<span class="hljs-keyword">true</span>)
.tlsTrustCertsFilePath(<span class="hljs-string">"/path/to/cacert.pem"</span>)
.authentication(tlsAuth)
.build();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="athenz"></a><a href="#athenz" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Athenz</h3>
<p>To use <a href="/docs/en/2.4.2/security-athenz">Athenz</a> as an authentication provider, you need to <a href="#tls-authentication">use TLS</a> and provide values for four parameters in a hash:</p>
<ul>
<li><code>tenantDomain</code></li>
<li><code>tenantService</code></li>
<li><code>providerDomain</code></li>
<li><code>privateKey</code></li>
</ul>
<p>You can also set an optional <code>keyId</code>. Here's an example configuration:</p>
<pre><code class="hljs css language-java">Map&lt;String, String&gt; authParams = <span class="hljs-keyword">new</span> HashMap&lt;&gt;();
authParams.put(<span class="hljs-string">"tenantDomain"</span>, <span class="hljs-string">"shopping"</span>); <span class="hljs-comment">// Tenant domain name</span>
authParams.put(<span class="hljs-string">"tenantService"</span>, <span class="hljs-string">"some_app"</span>); <span class="hljs-comment">// Tenant service name</span>
authParams.put(<span class="hljs-string">"providerDomain"</span>, <span class="hljs-string">"pulsar"</span>); <span class="hljs-comment">// Provider domain name</span>
authParams.put(<span class="hljs-string">"privateKey"</span>, <span class="hljs-string">"file:///path/to/private.pem"</span>); <span class="hljs-comment">// Tenant private key path</span>
authParams.put(<span class="hljs-string">"keyId"</span>, <span class="hljs-string">"v1"</span>); <span class="hljs-comment">// Key id for the tenant private key (optional, default: "0")</span>
Authentication athenzAuth = AuthenticationFactory
.create(AuthenticationAthenz<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>(), <span class="hljs-title">authParams</span>)</span>;
PulsarClient client = PulsarClient.builder()
.serviceUrl(<span class="hljs-string">"pulsar+ssl://my-broker.com:6651"</span>)
.enableTls(<span class="hljs-keyword">true</span>)
.tlsTrustCertsFilePath(<span class="hljs-string">"/path/to/cacert.pem"</span>)
.authentication(athenzAuth)
.build();
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="supported-pattern-formats"></a><a href="#supported-pattern-formats" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Supported pattern formats</h4>
<p>The <code>privateKey</code> parameter supports the following three pattern formats:</p>
<ul>
<li><code>file:///path/to/file</code></li>
<li><code>file:/path/to/file</code></li>
<li><code>data:application/x-pem-file;base64,&lt;base64-encoded value&gt;</code></li>
</ul>
</blockquote>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.4.2/security-extending"><span class="arrow-prev"></span><span>Extend Authentication and Authorization</span></a><a class="docs-next button" href="/docs/en/2.4.2/client-libraries-go"><span>Go</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#installation">Installation</a><ul class="toc-headings"><li><a href="#maven">Maven</a></li><li><a href="#gradle">Gradle</a></li></ul></li><li><a href="#connection-urls">Connection URLs</a></li><li><a href="#client-configuration">Client configuration</a></li><li><a href="#producers">Producers</a><ul class="toc-headings"><li><a href="#configuring-producers">Configuring producers</a></li><li><a href="#message-routing">Message routing</a></li><li><a href="#async-send">Async send</a></li><li><a href="#configuring-messages">Configuring messages</a></li></ul></li><li><a href="#consumers">Consumers</a><ul class="toc-headings"><li><a href="#configuring-consumers">Configuring consumers</a></li><li><a href="#async-receive">Async receive</a></li><li><a href="#multi-topic-subscriptions">Multi-topic subscriptions</a></li><li><a href="#subscription-types">Subscription types</a></li></ul></li><li><a href="#reader-interface">Reader interface</a></li><li><a href="#schemas">Schemas</a><ul class="toc-headings"><li><a href="#schema-example">Schema example</a></li></ul></li><li><a href="#authentication">Authentication</a><ul class="toc-headings"><li><a href="#tls-authentication">TLS Authentication</a></li><li><a href="#athenz">Athenz</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>