blob: 84932e2d3d4469fb8b74034949d92249a50a0283 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>The Pulsar Java client · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="The Pulsar Java client can be used both to create Java producers, consumers, and [readers](#reader-interface) of messages and to perform [administrative tasks](/docs/en/2.3.1/admin-api-overview). The current version of the Java client is **2.3.1**."/><meta name="docsearch:version" content="2.3.1"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="The Pulsar Java client · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="The Pulsar Java client can be used both to create Java producers, consumers, and [readers](#reader-interface) of messages and to perform [administrative tasks](/docs/en/2.3.1/admin-api-overview). The current version of the Java client is **2.3.1**."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.3.1</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.3.1/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.3.1/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.3.1/client-libraries-java">日本語</a></li><li><a href="/docs/fr/2.3.1/client-libraries-java">Français</a></li><li><a href="/docs/ko/2.3.1/client-libraries-java">한국어</a></li><li><a href="/docs/zh-CN/2.3.1/client-libraries-java">中文</a></li><li><a href="/docs/zh-TW/2.3.1/client-libraries-java">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Client Libraries</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Getting Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/getting-started-docker">Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries">Client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/concepts-schema-registry">Schema Registry</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-api">API</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-deploying">Deploying functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-guarantees">Processing guarantees</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-state">State Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/functions-metrics">Metrics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-managing">Managing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-connectors">Builtin Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-develop">Developing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/io-cdc">CDC Connector</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/sql-getting-started">Getting Started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/sql-deployment-configurations">Deployment and Configuration</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-dashboard">Dashboard</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/administration-proxy">Pulsar proxy</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-token-client">Client Authentication using tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-token-admin">Token authentication admin</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/security-extending">Extending</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.3.1/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/client-libraries-websocket">WebSocket</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/admin-api-schemas">Schemas</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.3.1/reference-configuration">Pulsar configuration</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/client-libraries-java.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">The Pulsar Java client</h1></header><article><div><span><p>The Pulsar Java client can be used both to create Java producers, consumers, and <a href="#reader-interface">readers</a> of messages and to perform <a href="/docs/en/2.3.1/admin-api-overview">administrative tasks</a>. The current version of the Java client is <strong>2.3.1</strong>.</p>
<p>Javadoc for the Pulsar client is divided up into two domains, by package:</p>
<table>
<thead>
<tr><th style="text-align:left">Package</th><th style="text-align:left">Description</th><th style="text-align:left">Maven Artifact</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="/api/client/2.3.1"><code>org.apache.pulsar.client.api</code></a></td><td style="text-align:left">The producer and consumer API</td><td style="text-align:left"><a href="http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client%7C2.3.1%7Cjar">org.apache.pulsar:pulsar-client:2.3.1</a></td></tr>
<tr><td style="text-align:left"><a href="/api/admin/2.3.1"><code>org.apache.pulsar.client.admin</code></a></td><td style="text-align:left">The Java <a href="/docs/en/2.3.1/admin-api-overview">admin API</a></td><td style="text-align:left"><a href="http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client-admin%7C2.3.1%7Cjar">org.apache.pulsar:pulsar-client-admin:2.3.1</a></td></tr>
</tbody>
</table>
<p>This document will focus only on the client API for producing and consuming messages on Pulsar topics. For a guide to using the Java admin client, see <a href="/docs/en/2.3.1/admin-api-overview">The Pulsar admin interface</a>.</p>
<h2><a class="anchor" aria-hidden="true" id="installation"></a><a href="#installation" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Installation</h2>
<p>The latest version of the Pulsar Java client library is available via <a href="http://search.maven.org/#artifactdetails%7Corg.apache.pulsar%7Cpulsar-client%7C2.3.1%7Cjar">Maven Central</a>. To use the latest version, add the <code>pulsar-client</code> library to your build configuration.</p>
<h3><a class="anchor" aria-hidden="true" id="maven"></a><a href="#maven" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Maven</h3>
<p>If you're using Maven, add this to your <code>pom.xml</code>:</p>
<pre><code class="hljs css language-xml"><span class="hljs-comment">&lt;!-- in your &lt;properties&gt; block --&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">pulsar.version</span>&gt;</span>2.3.1<span class="hljs-tag">&lt;/<span class="hljs-name">pulsar.version</span>&gt;</span>
<span class="hljs-comment">&lt;!-- in your &lt;dependencies&gt; block --&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.pulsar<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>pulsar-client<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>${pulsar.version}<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="gradle"></a><a href="#gradle" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Gradle</h3>
<p>If you're using Gradle, add this to your <code>build.gradle</code> file:</p>
<pre><code class="hljs css language-groovy"><span class="hljs-keyword">def</span> pulsarVersion = <span class="hljs-string">'2.3.1'</span>
dependencies {
compile <span class="hljs-string">group:</span> <span class="hljs-string">'org.apache.pulsar'</span>, <span class="hljs-string">name:</span> <span class="hljs-string">'pulsar-client'</span>, <span class="hljs-string">version:</span> pulsarVersion
}
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="connection-urls"></a><a href="#connection-urls" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Connection URLs</h2>
<p>To connect to Pulsar using client libraries, you need to specify a <a href="/docs/en/2.3.1/developing-binary-protocol">Pulsar protocol</a> URL.</p>
<p>Pulsar protocol URLs are assigned to specific clusters, use the <code>pulsar</code> scheme and have a default port of 6650. Here's an example for <code>localhost</code>:</p>
<pre><code class="hljs css language-http">pulsar://localhost:6650
</code></pre>
<p>A URL for a production Pulsar cluster may look something like this:</p>
<pre><code class="hljs css language-http">pulsar://pulsar.us-west.example.com:6650
</code></pre>
<p>If you're using <a href="/docs/en/2.3.1/security-tls-authentication">TLS</a> authentication, the URL will look like something like this:</p>
<pre><code class="hljs css language-http">pulsar+ssl://pulsar.us-west.example.com:6651
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="client-configuration"></a><a href="#client-configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Client configuration</h2>
<p>You can instantiate a <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
object using just a URL for the target Pulsar <a href="/docs/en/2.3.1/reference-terminology#cluster">cluster</a>, like this:</p>
<pre><code class="hljs css language-java">PulsarClient client = PulsarClient.builder()
.serviceUrl(<span class="hljs-string">"pulsar://localhost:6650"</span>)
.build();
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="default-broker-urls-for-standalone-clusters"></a><a href="#default-broker-urls-for-standalone-clusters" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Default broker URLs for standalone clusters</h4>
<p>If you're running a cluster in <a href="/docs/en/2.3.1/getting-started-standalone">standalone mode</a>, the broker will be available at the <code>pulsar://localhost:6650</code> URL by default.</p>
</blockquote>
<p>Check out the Javadoc for the <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
class for a full listing of configurable parameters.</p>
<blockquote>
<p>In addition to client-level configuration, you can also apply <a href="#configuring-producers">producer</a> and <a href="#configuring-consumers">consumer</a> specific configuration, as you'll see in the sections below.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="producers"></a><a href="#producers" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producers</h2>
<p>In Pulsar, producers write messages to topics. Once you've instantiated a <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
object (as in the section <a href="#client-configuration">above</a>), you can create a <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/Producer">Producer</a>
for a specific Pulsar <a href="/docs/en/2.3.1/reference-terminology#topic">topic</a>.</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; producer = client.newProducer()
.topic(<span class="hljs-string">"my-topic"</span>)
.create();
<span class="hljs-comment">// You can then send messages to the broker and topic you specified:</span>
producer.send(<span class="hljs-string">"My message"</span>.getBytes());
</code></pre>
<p>By default, producers produce messages that consist of byte arrays. You can produce different types, however, by specifying a message <a href="#schemas">schema</a>.</p>
<pre><code class="hljs css language-java">Producer&lt;String&gt; stringProducer = client.newProducer(Schema.STRING)
.topic(<span class="hljs-string">"my-topic"</span>)
.create();
stringProducer.send(<span class="hljs-string">"My message"</span>);
</code></pre>
<blockquote>
<p>You should always make sure to close your producers, consumers, and clients when they are no longer needed:</p>
<pre><code class="hljs css language-java">producer.close();
consumer.close();
client.close();
</code></pre>
<p>Close operations can also be asynchronous:</p>
<pre><code class="hljs css language-java">producer.closeAsync()
.thenRun(() -&gt; System.out.println(<span class="hljs-string">"Producer closed"</span>));
.exceptionally((ex) -&gt; {
System.err.println(<span class="hljs-string">"Failed to close producer: "</span> + ex);
<span class="hljs-keyword">return</span> ex;
});
</code></pre>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="configuring-producers"></a><a href="#configuring-producers" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuring producers</h3>
<p>If you instantiate a <code>Producer</code> object specifying only a topic name, as in the example above, the producer will use the default configuration. To use a non-default configuration, there's a variety of configurable parameters that you can set. For a full listing, see the Javadoc for the <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ProducerBuilder">ProducerBuilder</a>
class. Here's an example:</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; producer = client.newProducer()
.topic(<span class="hljs-string">"my-topic"</span>)
.batchingMaxPublishDelay(<span class="hljs-number">10</span>, TimeUnit.MILLISECONDS)
.sendTimeout(<span class="hljs-number">10</span>, TimeUnit.SECONDS)
.blockIfQueueFull(<span class="hljs-keyword">true</span>)
.create();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="message-routing"></a><a href="#message-routing" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Message routing</h3>
<p>When using partitioned topics, you can specify the routing mode whenever you publish messages using a producer. For more on specifying a routing mode using the Java client, see the <a href="/docs/en/2.3.1/cookbooks-partitioned">Partitioned Topics</a> cookbook.</p>
<h3><a class="anchor" aria-hidden="true" id="async-send"></a><a href="#async-send" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Async send</h3>
<p>You can also publish messages <a href="/docs/en/2.3.1/concepts-messaging#send-modes">asynchronously</a> using the Java client. With async send, the producer will put the message in a blocking queue and return immediately. The client library will then send the message to the broker in the background. If the queue is full (max size configurable), the producer could be blocked or fail immediately when calling the API, depending on arguments passed to the producer.</p>
<p>Here's an example async send operation:</p>
<pre><code class="hljs css language-java">producer.sendAsync(<span class="hljs-string">"my-async-message"</span>.getBytes()).thenAccept(msgId -&gt; {
System.out.printf(<span class="hljs-string">"Message with ID %s successfully sent"</span>, msgId);
});
</code></pre>
<p>As you can see from the example above, async send operations return a <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/MessageId">MessageId</a>
wrapped in a <a href="http://www.baeldung.com/java-completablefuture"><code>CompletableFuture</code></a>.</p>
<h3><a class="anchor" aria-hidden="true" id="configuring-messages"></a><a href="#configuring-messages" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuring messages</h3>
<p>In addition to a value, it's possible to set additional items on a given message:</p>
<pre><code class="hljs css language-java">producer.newMessage()
.key(<span class="hljs-string">"my-message-key"</span>)
.value(<span class="hljs-string">"my-async-message"</span>.getBytes())
.property(<span class="hljs-string">"my-key"</span>, <span class="hljs-string">"my-value"</span>)
.property(<span class="hljs-string">"my-other-key"</span>, <span class="hljs-string">"my-other-value"</span>)
.send();
</code></pre>
<p>As for the previous case, it's also possible to terminate the builder chain with <code>sendAsync()</code> and
get a future returned.</p>
<h2><a class="anchor" aria-hidden="true" id="consumers"></a><a href="#consumers" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumers</h2>
<p>In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new <a href="/docs/en/2.3.1/reference-terminology#consumer">consumer</a> by first instantiating a <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
object and passing it a URL for a Pulsar broker (as <a href="#client-configuration">above</a>).</p>
<p>Once you've instantiated a <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/PulsarClient">PulsarClient</a>
object, you can create a <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/Consumer">Consumer</a>
by specifying a <a href="/docs/en/2.3.1/reference-terminology#topic">topic</a> and a <a href="/docs/en/2.3.1/concepts-messaging#subscription-modes">subscription</a>.</p>
<pre><code class="hljs css language-java">Consumer consumer = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.subscribe();
</code></pre>
<p>The <code>subscribe</code> method will automatically subscribe the consumer to the specified topic and subscription. One way to make the consumer listen on the topic is to set up a <code>while</code> loop. In this example loop, the consumer listens for messages, prints the contents of any message that's received, and then <a href="/docs/en/2.3.1/reference-terminology#acknowledgment-ack">acknowledges</a> that the message has been processed. If the processing logic fails, we use <a href="/docs/en/2.3.1/reference-terminology#acknowledgment-ack">negative acknowledgement</a>
to have the message redelivered at a later point in time.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
<span class="hljs-comment">// Wait for a message</span>
Message msg = consumer.receive();
<span class="hljs-keyword">try</span> {
<span class="hljs-comment">// Do something with the message</span>
System.out.printf(<span class="hljs-string">"Message received: %s"</span>, <span class="hljs-keyword">new</span> String(msg.getData()));
<span class="hljs-comment">// Acknowledge the message so that it can be deleted by the message broker</span>
consumer.acknowledge(msg);
} <span class="hljs-keyword">catch</span> (Exception e) {
<span class="hljs-comment">// Message failed to process, redeliver later</span>
consumer.negativeAcknowledge(msg);
}
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="configuring-consumers"></a><a href="#configuring-consumers" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuring consumers</h3>
<p>If you instantiate a <code>Consumer</code> object specifying only a topic and subscription name, as in the example above, the consumer will use the default configuration. To use a non-default configuration, there's a variety of configurable parameters that you can set. For a full listing, see the Javadoc for the <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ConsumerBuilder">ConsumerBuilder</a>
class. Here's an example:</p>
<p>Here's an example configuration:</p>
<pre><code class="hljs css language-java">Consumer consumer = client.newConsumer()
.topic(<span class="hljs-string">"my-topic"</span>)
.subscriptionName(<span class="hljs-string">"my-subscription"</span>)
.ackTimeout(<span class="hljs-number">10</span>, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="async-receive"></a><a href="#async-receive" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Async receive</h3>
<p>The <code>receive</code> method will receive messages synchronously (the consumer process will be blocked until a message is available). You can also use <a href="/docs/en/2.3.1/concepts-messaging#receive-modes">async receive</a>, which will return immediately with a <a href="http://www.baeldung.com/java-completablefuture"><code>CompletableFuture</code></a> object that completes once a new message is available.</p>
<p>Here's an example:</p>
<pre><code class="hljs css language-java">CompletableFuture&lt;Message&gt; asyncMessage = consumer.receiveAsync();
</code></pre>
<p>Async receive operations return a <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/Message">Message</a>
wrapped inside of a <a href="http://www.baeldung.com/java-completablefuture"><code>CompletableFuture</code></a>.</p>
<h3><a class="anchor" aria-hidden="true" id="multi-topic-subscriptions"></a><a href="#multi-topic-subscriptions" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Multi-topic subscriptions</h3>
<p>In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously using <a href="/docs/en/2.3.1/concepts-messaging#multi-topic-subscriptions">multi-topic subscriptions</a>. To use multi-topic subscriptions you can supply either a regular expression (regex) or a <code>List</code> of topics. If you select topics via regex, all topics must be within the same Pulsar namespace.</p>
<p>Here are some examples:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.client.api.Consumer;
<span class="hljs-keyword">import</span> org.apache.pulsar.client.api.PulsarClient;
<span class="hljs-keyword">import</span> java.util.Arrays;
<span class="hljs-keyword">import</span> java.util.List;
<span class="hljs-keyword">import</span> java.util.regex.Pattern;
ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
.subscriptionName(subscription);
<span class="hljs-comment">// Subscribe to all topics in a namespace</span>
Pattern allTopicsInNamespace = Pattern.compile(<span class="hljs-string">"persistent://public/default/.*"</span>);
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.subscribe();
<span class="hljs-comment">// Subscribe to a subsets of topics in a namespace, based on regex</span>
Pattern someTopicsInNamespace = Pattern.compile(<span class="hljs-string">"persistent://public/default/foo.*"</span>);
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(someTopicsInNamespace)
.subscribe();
</code></pre>
<p>You can also subscribe to an explicit list of topics (across namespaces if you wish):</p>
<pre><code class="hljs css language-java">List&lt;String&gt; topics = Arrays.asList(
<span class="hljs-string">"topic-1"</span>,
<span class="hljs-string">"topic-2"</span>,
<span class="hljs-string">"topic-3"</span>
);
Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.subscribe();
<span class="hljs-comment">// Alternatively:</span>
Consumer multiTopicConsumer = consumerBuilder
.topics(
<span class="hljs-string">"topic-1"</span>,
<span class="hljs-string">"topic-2"</span>,
<span class="hljs-string">"topic-3"</span>
)
.subscribe();
</code></pre>
<p>You can also subscribe to multiple topics asynchronously using the <code>subscribeAsync</code> method rather than the synchronous <code>subscribe</code> method. Here's an example:</p>
<pre><code class="hljs css language-java">Pattern allTopicsInNamespace = Pattern.compile(<span class="hljs-string">"persistent://public/default.*"</span>);
consumerBuilder
.topics(topics)
.subscribeAsync()
.thenAccept(consumer -&gt; {
<span class="hljs-keyword">do</span> {
<span class="hljs-keyword">try</span> {
Message msg = consumer.receive();
<span class="hljs-comment">// Do something with the received message</span>
} <span class="hljs-keyword">catch</span> (PulsarClientException e) {
e.printStackTrace();
}
} <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>);
});
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="reader-interface"></a><a href="#reader-interface" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reader interface</h2>
<p>With the <a href="/docs/en/2.3.1/concepts-clients#reader-interface">reader interface</a>, Pulsar clients can &quot;manually position&quot; themselves within a topic, reading all messages from a specified message onward. The Pulsar API for Java enables you to create <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/Reader">Reader</a>
objects by specifying a topic, a <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/MessageId">MessageId</a>
, and <a href="https://pulsar.apache.org/api/client/2.3.1/org/apache/pulsar/client/api/ReaderConfiguration">ReaderConfiguration</a>
.</p>
<p>Here's an example:</p>
<pre><code class="hljs css language-java">ReaderConfiguration conf = <span class="hljs-keyword">new</span> ReaderConfiguration();
<span class="hljs-keyword">byte</span>[] msgIdBytes = <span class="hljs-comment">// Some message ID byte array</span>
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();
<span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
Message message = reader.readNext();
<span class="hljs-comment">// Process message</span>
}
</code></pre>
<p>In the example above, a <code>Reader</code> object is instantiated for a specific topic and message (by ID); the reader then iterates over each message in the topic after the message identified by <code>msgIdBytes</code> (how that value is obtained depends on the application).</p>
<p>The code sample above shows pointing the <code>Reader</code> object to a specific message (by ID), but you can also use <code>MessageId.earliest</code> to point to the earliest available message on the topic of <code>MessageId.latest</code> to point to the most recent available message.</p>
<h2><a class="anchor" aria-hidden="true" id="schemas"></a><a href="#schemas" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Schemas</h2>
<p>In Pulsar, all message data consists of byte arrays &quot;under the hood.&quot; <a href="/docs/en/2.3.1/concepts-schema-registry">Message schemas</a> enable you to use other types of data when constructing and handling messages (from simple types like strings to more complex, application-specific types). If you construct, say, a <a href="#producers">producer</a> without specifying a schema, then the producer can only produce messages of type <code>byte[]</code>. Here's an example:</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; producer = client.newProducer()
.topic(topic)
.create();
</code></pre>
<p>The producer above is equivalent to a <code>Producer&lt;byte[]&gt;</code> (in fact, you should <em>always</em> explicitly specify the type). If you'd like to use a producer for a different type of data, you'll need to specify a <strong>schema</strong> that informs Pulsar which data type will be transmitted over the <a href="/docs/en/2.3.1/reference-terminology#topic">topic</a>.</p>
<h3><a class="anchor" aria-hidden="true" id="schema-example"></a><a href="#schema-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Schema example</h3>
<p>Let's say that you have a <code>SensorReading</code> class that you'd like to transmit over a Pulsar topic:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">SensorReading</span> </span>{
<span class="hljs-keyword">public</span> <span class="hljs-keyword">float</span> temperature;
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">SensorReading</span><span class="hljs-params">(<span class="hljs-keyword">float</span> temperature)</span> </span>{
<span class="hljs-keyword">this</span>.temperature = temperature;
}
<span class="hljs-comment">// A no-arg constructor is required</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">SensorReading</span><span class="hljs-params">()</span> </span>{
}
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">float</span> <span class="hljs-title">getTemperature</span><span class="hljs-params">()</span> </span>{
<span class="hljs-keyword">return</span> temperature;
}
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">setTemperature</span><span class="hljs-params">(<span class="hljs-keyword">float</span> temperature)</span> </span>{
<span class="hljs-keyword">this</span>.temperature = temperature;
}
}
</code></pre>
<p>You could then create a <code>Producer&lt;SensorReading&gt;</code> (or <code>Consumer&lt;SensorReading&gt;</code>) like so:</p>
<pre><code class="hljs css language-java">Producer&lt;SensorReading&gt; producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-readings")
.create();
</code></pre>
<p>The following schema formats are currently available for Java:</p>
<ul>
<li><p>No schema or the byte array schema (which can be applied using <code>Schema.BYTES</code>):</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; bytesProducer = client.newProducer(Schema.BYTES)
.topic(<span class="hljs-string">"some-raw-bytes-topic"</span>)
.create();
</code></pre>
<p>Or, equivalently:</p>
<pre><code class="hljs css language-java">Producer&lt;<span class="hljs-keyword">byte</span>[]&gt; bytesProducer = client.newProducer()
.topic(<span class="hljs-string">"some-raw-bytes-topic"</span>)
.create();
</code></pre></li>
<li><p><code>String</code> for normal UTF-8-encoded string data. This schema can be applied using <code>Schema.STRING</code>:</p>
<pre><code class="hljs css language-java">Producer&lt;String&gt; stringProducer = client.newProducer(Schema.STRING)
.topic(<span class="hljs-string">"some-string-topic"</span>)
.create();
</code></pre></li>
<li><p>JSON schemas can be created for POJOs using the <code>JSONSchema</code> class. Here's an example:</p>
<pre><code class="hljs css language-java">Schema&lt;MyPojo&gt; pojoSchema = JSONSchema.of(MyPojo<span class="hljs-class">.<span class="hljs-keyword">class</span>)</span>;
Producer&lt;MyPojo&gt; pojoProducer = client.newProducer(pojoSchema)
.topic(<span class="hljs-string">"some-pojo-topic"</span>)
.create();
</code></pre></li>
</ul>
<h2><a class="anchor" aria-hidden="true" id="authentication"></a><a href="#authentication" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Authentication</h2>
<p>Pulsar currently supports two authentication schemes: <a href="/docs/en/2.3.1/security-tls-authentication">TLS</a> and <a href="/docs/en/2.3.1/security-athenz">Athenz</a>. The Pulsar Java client can be used with both.</p>
<h3><a class="anchor" aria-hidden="true" id="tls-authentication"></a><a href="#tls-authentication" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>TLS Authentication</h3>
<p>To use <a href="/docs/en/2.3.1/security-tls-authentication">TLS</a>, you need to set TLS to <code>true</code> using the <code>setUseTls</code> method, point your Pulsar client to a TLS cert path, and provide paths to cert and key files.</p>
<p>Here's an example configuration:</p>
<pre><code class="hljs css language-java">Map&lt;String, String&gt; authParams = <span class="hljs-keyword">new</span> HashMap&lt;&gt;();
authParams.put(<span class="hljs-string">"tlsCertFile"</span>, <span class="hljs-string">"/path/to/client-cert.pem"</span>);
authParams.put(<span class="hljs-string">"tlsKeyFile"</span>, <span class="hljs-string">"/path/to/client-key.pem"</span>);
Authentication tlsAuth = AuthenticationFactory
.create(AuthenticationTls<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>(), <span class="hljs-title">authParams</span>)</span>;
PulsarClient client = PulsarClient.builder()
.serviceUrl(<span class="hljs-string">"pulsar+ssl://my-broker.com:6651"</span>)
.enableTls(<span class="hljs-keyword">true</span>)
.tlsTrustCertsFilePath(<span class="hljs-string">"/path/to/cacert.pem"</span>)
.authentication(tlsAuth)
.build();
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="athenz"></a><a href="#athenz" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Athenz</h3>
<p>To use <a href="/docs/en/2.3.1/security-athenz">Athenz</a> as an authentication provider, you need to <a href="#tls-authentication">use TLS</a> and provide values for four parameters in a hash:</p>
<ul>
<li><code>tenantDomain</code></li>
<li><code>tenantService</code></li>
<li><code>providerDomain</code></li>
<li><code>privateKey</code></li>
</ul>
<p>You can also set an optional <code>keyId</code>. Here's an example configuration:</p>
<pre><code class="hljs css language-java">Map&lt;String, String&gt; authParams = <span class="hljs-keyword">new</span> HashMap&lt;&gt;();
authParams.put(<span class="hljs-string">"tenantDomain"</span>, <span class="hljs-string">"shopping"</span>); <span class="hljs-comment">// Tenant domain name</span>
authParams.put(<span class="hljs-string">"tenantService"</span>, <span class="hljs-string">"some_app"</span>); <span class="hljs-comment">// Tenant service name</span>
authParams.put(<span class="hljs-string">"providerDomain"</span>, <span class="hljs-string">"pulsar"</span>); <span class="hljs-comment">// Provider domain name</span>
authParams.put(<span class="hljs-string">"privateKey"</span>, <span class="hljs-string">"file:///path/to/private.pem"</span>); <span class="hljs-comment">// Tenant private key path</span>
authParams.put(<span class="hljs-string">"keyId"</span>, <span class="hljs-string">"v1"</span>); <span class="hljs-comment">// Key id for the tenant private key (optional, default: "0")</span>
Authentication athenzAuth = AuthenticationFactory
.create(AuthenticationAthenz<span class="hljs-class">.<span class="hljs-keyword">class</span>.<span class="hljs-title">getName</span>(), <span class="hljs-title">authParams</span>)</span>;
PulsarClient client = PulsarClient.builder()
.serviceUrl(<span class="hljs-string">"pulsar+ssl://my-broker.com:6651"</span>)
.enableTls(<span class="hljs-keyword">true</span>)
.tlsTrustCertsFilePath(<span class="hljs-string">"/path/to/cacert.pem"</span>)
.authentication(athenzAuth)
.build();
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="supported-pattern-formats"></a><a href="#supported-pattern-formats" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Supported pattern formats</h4>
<p>The <code>privateKey</code> parameter supports the following three pattern formats:</p>
<ul>
<li><code>file:///path/to/file</code></li>
<li><code>file:/path/to/file</code></li>
<li><code>data:application/x-pem-file;base64,&lt;base64-encoded value&gt;</code></li>
</ul>
</blockquote>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.3.1/security-extending"><span class="arrow-prev"></span><span>Extend Authentication and Authorization</span></a><a class="docs-next button" href="/docs/en/2.3.1/client-libraries-go"><span>Go</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#installation">Installation</a><ul class="toc-headings"><li><a href="#maven">Maven</a></li><li><a href="#gradle">Gradle</a></li></ul></li><li><a href="#connection-urls">Connection URLs</a></li><li><a href="#client-configuration">Client configuration</a></li><li><a href="#producers">Producers</a><ul class="toc-headings"><li><a href="#configuring-producers">Configuring producers</a></li><li><a href="#message-routing">Message routing</a></li><li><a href="#async-send">Async send</a></li><li><a href="#configuring-messages">Configuring messages</a></li></ul></li><li><a href="#consumers">Consumers</a><ul class="toc-headings"><li><a href="#configuring-consumers">Configuring consumers</a></li><li><a href="#async-receive">Async receive</a></li><li><a href="#multi-topic-subscriptions">Multi-topic subscriptions</a></li></ul></li><li><a href="#reader-interface">Reader interface</a></li><li><a href="#schemas">Schemas</a><ul class="toc-headings"><li><a href="#schema-example">Schema example</a></li></ul></li><li><a href="#authentication">Authentication</a><ul class="toc-headings"><li><a href="#tls-authentication">TLS Authentication</a></li><li><a href="#athenz">Athenz</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>