blob: 626ead36b34aec00953454b4a21d3c4c83fe1c95 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Pulsar Clients · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="Pulsar exposes a client API with language bindings for [Java](/docs/en/2.7.0/client-libraries-java), [Go](/docs/en/2.7.0/client-libraries-go), [Python](/docs/en/2.7.0/client-libraries-python), [C++](/docs/en/2.7.0/client-libraries-cpp) and [C#](/docs/en/2.7.0/client-libraries-dotnet). The client API optimizes and encapsulates Pulsar&#x27;s client-broker communication protocol and exposes a simple and intuitive API for use by applications."/><meta name="docsearch:version" content="2.7.0"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Pulsar Clients · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="Pulsar exposes a client API with language bindings for [Java](/docs/en/2.7.0/client-libraries-java), [Go](/docs/en/2.7.0/client-libraries-go), [Python](/docs/en/2.7.0/client-libraries-python), [C++](/docs/en/2.7.0/client-libraries-cpp) and [C#](/docs/en/2.7.0/client-libraries-dotnet). The client API optimizes and encapsulates Pulsar&#x27;s client-broker communication protocol and exposes a simple and intuitive API for use by applications."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.7.0</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.7.0/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.7.0/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.7.0/concepts-clients">日本語</a></li><li><a href="/docs/fr/2.7.0/concepts-clients">Français</a></li><li><a href="/docs/ko/2.7.0/concepts-clients">한국어</a></li><li><a href="/docs/zh-CN/2.7.0/concepts-clients">中文</a></li><li><a href="/docs/zh-TW/2.7.0/concepts-clients">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Concepts and Architecture</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/getting-started-helm">Run Pulsar in Kubernetes</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/concepts-architecture-overview">Architecture</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.7.0/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/concepts-proxy-sni-routing">Proxy support with SNI routing</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/concepts-multiple-advertised-listeners">Multiple advertised listeners</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/functions-package">How-to: Package</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/io-connectors">Built-in connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/io-cdc">CDC connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/io-develop">Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/io-cli">CLI</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/sql-getting-started">Query data</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/sql-deployment-configurations">Configuration and deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/sql-rest-api">REST APIs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Tiered Storage</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/tiered-storage-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/tiered-storage-aws">AWS S3 offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/tiered-storage-gcs">GCS offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/tiered-storage-filesystem">Filesystem offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/tiered-storage-azure">Azure BlobStore offloader</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Transactions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/transactions">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/transactions-guarantee">Transactions Guarantee</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/transactions-api">Transactions API</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Kubernetes (Helm)</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/helm-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/helm-prepare">Prepare</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/helm-install">Install</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/helm-deploy">Deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/helm-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/helm-tools">Required Tools</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/deploy-docker">Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/deploy-monitoring">Monitor</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/administration-pulsar-manager">Pulsar Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/administration-upgrade">Upgrade</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/security-tls-keystore">Using TLS with KeyStore configure</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/security-oauth2">Authentication using OAuth 2.0 access tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/security-extending">Extending</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/security-bouncy-castle">Bouncy Castle Providers</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Performance</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/performance-pulsar-perf">Pulsar Perf</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/client-libraries">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/client-libraries-node">Node.js</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/client-libraries-websocket">WebSocket</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/client-libraries-dotnet">C#</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/admin-api-topics">Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/admin-api-functions">Functions</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.7.0/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/concepts-clients.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Pulsar Clients</h1></header><article><div><span><p>Pulsar exposes a client API with language bindings for <a href="/docs/en/2.7.0/client-libraries-java">Java</a>, <a href="/docs/en/2.7.0/client-libraries-go">Go</a>, <a href="/docs/en/2.7.0/client-libraries-python">Python</a>, <a href="/docs/en/2.7.0/client-libraries-cpp">C++</a> and <a href="/docs/en/2.7.0/client-libraries-dotnet">C#</a>. The client API optimizes and encapsulates Pulsar's client-broker communication protocol and exposes a simple and intuitive API for use by applications.</p>
<p>Under the hood, the current official Pulsar client libraries support transparent reconnection and/or connection failover to brokers, queuing of messages until acknowledged by the broker, and heuristics such as connection retries with backoff.</p>
<blockquote>
<p><strong>Custom client libraries</strong>
If you'd like to create your own client library, we recommend consulting the documentation on Pulsar's custom <a href="/docs/en/2.7.0/developing-binary-protocol">binary protocol</a>.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="client-setup-phase"></a><a href="#client-setup-phase" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Client setup phase</h2>
<p>Before an application creates a producer/consumer, the Pulsar client library needs to initiate a setup phase including two steps:</p>
<ol>
<li>The client attempts to determine the owner of the topic by sending an HTTP lookup request to the broker. The request could reach one of the active brokers which, by looking at the (cached) zookeeper metadata knows who is serving the topic or, in case nobody is serving it, tries to assign it to the least loaded broker.</li>
<li>Once the client library has the broker address, it creates a TCP connection (or reuse an existing connection from the pool) and authenticates it. Within this connection, client and broker exchange binary commands from a custom protocol. At this point the client sends a command to create producer/consumer to the broker, which will comply after having validated the authorization policy.</li>
</ol>
<p>Whenever the TCP connection breaks, the client immediately re-initiates this setup phase and keeps trying with exponential backoff to re-establish the producer or consumer until the operation succeeds.</p>
<h2><a class="anchor" aria-hidden="true" id="reader-interface"></a><a href="#reader-interface" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reader interface</h2>
<p>In Pulsar, the &quot;standard&quot; <a href="/docs/en/2.7.0/concepts-messaging#consumers">consumer interface</a> involves using consumers to listen on <a href="/docs/en/2.7.0/reference-terminology#topic">topics</a>, process incoming messages, and finally acknowledge those messages when they are processed. Whenever a new subscription is created, it is initially positioned at the end of the topic (by default), and consumers associated with that subscription begin reading with the first message created afterwards. Whenever a consumer connects to a topic using a pre-existing subscription, it begins reading from the earliest message un-acked within that subscription. In summary, with the consumer interface, subscription cursors are automatically managed by Pulsar in response to <a href="/docs/en/2.7.0/concepts-messaging#acknowledgement">message acknowledgements</a>.</p>
<p>The <strong>reader interface</strong> for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic---rather than a consumer---you need to specify <em>which</em> message the reader begins reading from when it connects to a topic. When connecting to a topic, the reader interface enables you to begin with:</p>
<ul>
<li>The <strong>earliest</strong> available message in the topic</li>
<li>The <strong>latest</strong> available message in the topic</li>
<li>Some other message between the earliest and the latest. If you select this option, you'll need to explicitly provide a message ID. Your application will be responsible for &quot;knowing&quot; this message ID in advance, perhaps fetching it from a persistent data store or cache.</li>
</ul>
<p>The reader interface is helpful for use cases like using Pulsar to provide effectively-once processing semantics for a stream processing system. For this use case, it's essential that the stream processing system be able to &quot;rewind&quot; topics to a specific message and begin reading there. The reader interface provides Pulsar clients with the low-level abstraction necessary to &quot;manually position&quot; themselves within a topic.</p>
<p>Internally, the reader interface is implemented as a consumer using an exclusive, non-durable subscription to the topic with a randomly-allocated name.</p>
<p>[ <strong>IMPORTANT</strong> ]</p>
<p>Unlike subscription/consumer, readers are non-durable in nature and does not prevent data in a topic from being deleted, thus it is <strong><em>strongly</em></strong> advised that <a href="/docs/en/2.7.0/cookbooks-retention-expiry">data retention</a> be configured. If data retention for a topic is not configured for an adequate amount of time, messages that the reader has not yet read might be deleted . This causes the readers to essentially skip messages. Configuring the data retention for a topic guarantees the reader with a certain duration to read a message.</p>
<p>Please also note that a reader can have a &quot;backlog&quot;, but the metric is only used for users to know how behind the reader is. The metric is not considered for any backlog quota calculations.</p>
<p><img src="/docs/assets/pulsar-reader-consumer-interfaces.png" alt="The Pulsar consumer and reader interfaces"></p>
<p>Here's a Java example that begins reading from the earliest available message on a topic:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.client.api.Message;
<span class="hljs-keyword">import</span> org.apache.pulsar.client.api.MessageId;
<span class="hljs-keyword">import</span> org.apache.pulsar.client.api.Reader;
<span class="hljs-comment">// Create a reader on a topic and for a specific message (and onward)</span>
Reader&lt;<span class="hljs-keyword">byte</span>[]&gt; reader = pulsarClient.newReader()
.topic(<span class="hljs-string">"reader-api-test"</span>)
.startMessageId(MessageId.earliest)
.create();
<span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
Message message = reader.readNext();
<span class="hljs-comment">// Process the message</span>
}
</code></pre>
<p>To create a reader that reads from the latest available message:</p>
<pre><code class="hljs css language-java">Reader&lt;<span class="hljs-keyword">byte</span>[]&gt; reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.latest)
.create();
</code></pre>
<p>To create a reader that reads from some message between the earliest and the latest:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">byte</span>[] msgIdBytes = <span class="hljs-comment">// Some byte array</span>
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader&lt;<span class="hljs-keyword">byte</span>[]&gt; reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.7.0/concepts-architecture-overview"><span class="arrow-prev"></span><span>Architecture</span></a><a class="docs-next button" href="/docs/en/2.7.0/concepts-replication"><span>Geo Replication</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#client-setup-phase">Client setup phase</a></li><li><a href="#reader-interface">Reader interface</a></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>