blob: a509dd242e501fd838245a587311bafa22fa5037 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Using Pulsar as a message queue · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="Message queues are essential components of many large-scale data architectures. If every single work object that passes through your system absolutely *must* be processed in spite of the slowness or downright failure of this or that system component, there&#x27;s a good chance that you&#x27;ll need a message queue to step in and ensure that unprocessed data is retained---with correct ordering---until the required actions are taken."/><meta name="docsearch:version" content="2.9.1"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Using Pulsar as a message queue · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="Message queues are essential components of many large-scale data architectures. If every single work object that passes through your system absolutely *must* be processed in spite of the slowness or downright failure of this or that system component, there&#x27;s a good chance that you&#x27;ll need a message queue to step in and ensure that unprocessed data is retained---with correct ordering---until the required actions are taken."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.9.1</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.9.1/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.9.1/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.9.1/cookbooks-message-queue">日本語</a></li><li><a href="/docs/fr/2.9.1/cookbooks-message-queue">Français</a></li><li><a href="/docs/ko/2.9.1/cookbooks-message-queue">한국어</a></li><li><a href="/docs/zh-CN/2.9.1/cookbooks-message-queue">中文</a></li><li><a href="/docs/zh-TW/2.9.1/cookbooks-message-queue">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Cookbooks</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/getting-started-helm">Run Pulsar in Kubernetes</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/concepts-proxy-sni-routing">Proxy support with SNI routing</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/concepts-multiple-advertised-listeners">Multiple advertised listeners</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/functions-package">How-to: Package</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/io-connectors">Built-in connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/io-cdc">CDC connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/io-develop">Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/io-cli">CLI</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/sql-getting-started">Query data</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/sql-deployment-configurations">Configuration and deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/sql-rest-api">REST APIs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Tiered Storage</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/tiered-storage-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/tiered-storage-aws">AWS S3 offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/tiered-storage-gcs">GCS offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/tiered-storage-filesystem">Filesystem offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/tiered-storage-azure">Azure BlobStore offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/tiered-storage-aliyun">Aliyun OSS offloader</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Transactions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/txn-why">Why transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/txn-what">What are transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/txn-how">How transactions work?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/txn-use">How to use transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/txn-monitor">How to monitor transactions?</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Kubernetes (Helm)</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/helm-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/helm-prepare">Prepare</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/helm-install">Install</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/helm-deploy">Deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/helm-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/helm-tools">Required Tools</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/deploy-docker">Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/deploy-monitoring">Monitor</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/administration-pulsar-manager">Pulsar Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/administration-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/administration-isolation">Pulsar isolation</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/security-tls-keystore">Using TLS with KeyStore configure</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/security-oauth2">Authentication using OAuth 2.0 access tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/security-extending">Extending</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/security-bouncy-castle">Bouncy Castle Providers</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Performance</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/performance-pulsar-perf">Pulsar Perf</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/client-libraries">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/client-libraries-node">Node.js</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/client-libraries-websocket">WebSocket</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/client-libraries-dotnet">C#</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/admin-api-topics">Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/admin-api-functions">Functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/admin-api-packages">Packages</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/cookbooks-encryption">Encryption</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.9.1/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/develop-load-manager">Modular load manager</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.1/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/cookbooks-message-queue.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Using Pulsar as a message queue</h1></header><article><div><span><p>Message queues are essential components of many large-scale data architectures. If every single work object that passes through your system absolutely <em>must</em> be processed in spite of the slowness or downright failure of this or that system component, there's a good chance that you'll need a message queue to step in and ensure that unprocessed data is retained---with correct ordering---until the required actions are taken.</p>
<p>Pulsar is a great choice for a message queue because:</p>
<ul>
<li>it was built with <a href="/docs/en/2.9.1/concepts-architecture-overview#persistent-storage">persistent message storage</a> in mind</li>
<li>it offers automatic load balancing across <a href="/docs/en/2.9.1/reference-terminology#consumer">consumers</a> for messages on a topic (or custom load balancing if you wish)</li>
</ul>
<blockquote>
<p>You can use the same Pulsar installation to act as a real-time message bus and as a message queue if you wish (or just one or the other). You can set aside some topics for real-time purposes and other topics for message queue purposes (or use specific namespaces for either purpose if you wish).</p>
</blockquote>
<h1><a class="anchor" aria-hidden="true" id="client-configuration-changes"></a><a href="#client-configuration-changes" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Client configuration changes</h1>
<p>To use a Pulsar <a href="/docs/en/2.9.1/reference-terminology#topic">topic</a> as a message queue, you should distribute the receiver load on that topic across several consumers (the optimal number of consumers will depend on the load). Each consumer must:</p>
<ul>
<li><p>Establish a <a href="/docs/en/2.9.1/concepts-messaging#shared">shared subscription</a> and use the same subscription name as the other consumers (otherwise the subscription is not shared and the consumers can't act as a processing ensemble)</p></li>
<li><p>If you'd like to have tight control over message dispatching across consumers, set the consumers' <strong>receiver queue</strong> size very low (potentially even to 0 if necessary). Each Pulsar <a href="/docs/en/2.9.1/reference-terminology#consumer">consumer</a> has a receiver queue that determines how many messages the consumer will attempt to fetch at a time. A receiver queue of 1000 (the default), for example, means that the consumer will attempt to process 1000 messages from the topic's backlog upon connection. Setting the receiver queue to zero essentially means ensuring that each consumer is only doing one thing at a time.</p>
<p>The downside to restricting the receiver queue size of consumers is that that limits the potential throughput of those consumers and cannot be used with <a href="/docs/en/2.9.1/reference-terminology#partitioned-topic">partitioned topics</a>. Whether the performance/control trade-off is worthwhile will depend on your use case.</p></li>
</ul>
<h2><a class="anchor" aria-hidden="true" id="java-clients"></a><a href="#java-clients" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java clients</h2>
<p>Here's an example Java consumer configuration that uses a shared subscription:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.client.api.Consumer;
<span class="hljs-keyword">import</span> org.apache.pulsar.client.api.PulsarClient;
<span class="hljs-keyword">import</span> org.apache.pulsar.client.api.SubscriptionType;
String SERVICE_URL = <span class="hljs-string">"pulsar://localhost:6650"</span>;
String TOPIC = <span class="hljs-string">"persistent://public/default/mq-topic-1"</span>;
String subscription = <span class="hljs-string">"sub-1"</span>;
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
Consumer consumer = client.newConsumer()
.topic(TOPIC)
.subscriptionName(subscription)
.subscriptionType(SubscriptionType.Shared)
<span class="hljs-comment">// If you'd like to restrict the receiver queue size</span>
.receiverQueueSize(<span class="hljs-number">10</span>)
.subscribe();
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="python-clients"></a><a href="#python-clients" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python clients</h2>
<p>Here's an example Python consumer configuration that uses a shared subscription:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Client, ConsumerType
SERVICE_URL = <span class="hljs-string">"pulsar://localhost:6650"</span>
TOPIC = <span class="hljs-string">"persistent://public/default/mq-topic-1"</span>
SUBSCRIPTION = <span class="hljs-string">"sub-1"</span>
client = Client(SERVICE_URL)
consumer = client.subscribe(
TOPIC,
SUBSCRIPTION,
<span class="hljs-comment"># If you'd like to restrict the receiver queue size</span>
receiver_queue_size=<span class="hljs-number">10</span>,
consumer_type=ConsumerType.Shared)
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="c-clients"></a><a href="#c-clients" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>C++ clients</h2>
<p>Here's an example C++ consumer configuration that uses a shared subscription:</p>
<pre><code class="hljs css language-cpp"><span class="hljs-meta">#<span class="hljs-meta-keyword">include</span> <span class="hljs-meta-string">&lt;pulsar/Client.h&gt;</span></span>
<span class="hljs-built_in">std</span>::<span class="hljs-built_in">string</span> serviceUrl = <span class="hljs-string">"pulsar://localhost:6650"</span>;
<span class="hljs-built_in">std</span>::<span class="hljs-built_in">string</span> topic = <span class="hljs-string">"persistent://public/defaultmq-topic-1"</span>;
<span class="hljs-built_in">std</span>::<span class="hljs-built_in">string</span> subscription = <span class="hljs-string">"sub-1"</span>;
<span class="hljs-function">Client <span class="hljs-title">client</span><span class="hljs-params">(serviceUrl)</span></span>;
ConsumerConfiguration consumerConfig;
consumerConfig.setConsumerType(ConsumerType.ConsumerShared);
<span class="hljs-comment">// If you'd like to restrict the receiver queue size</span>
consumerConfig.setReceiverQueueSize(<span class="hljs-number">10</span>);
Consumer consumer;
Result result = client.subscribe(topic, subscription, consumerConfig, consumer);
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="go-clients"></a><a href="#go-clients" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Go clients</h2>
<p>Here is an example of a Go consumer configuration that uses a shared subscription:</p>
<pre><code class="hljs css language-go"><span class="hljs-keyword">import</span> <span class="hljs-string">"github.com/apache/pulsar-client-go/pulsar"</span>
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: <span class="hljs-string">"pulsar://localhost:6650"</span>,
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
log.Fatal(err)
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: <span class="hljs-string">"persistent://public/default/mq-topic-1"</span>,
SubscriptionName: <span class="hljs-string">"sub-1"</span>,
Type: pulsar.Shared,
ReceiverQueueSize: <span class="hljs-number">10</span>, <span class="hljs-comment">// If you'd like to restrict the receiver queue size</span>
})
<span class="hljs-keyword">if</span> err != <span class="hljs-literal">nil</span> {
log.Fatal(err)
}
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.9.1/cookbooks-encryption"><span class="arrow-prev"></span><span>Encryption</span></a><a class="docs-next button" href="/docs/en/2.9.1/cookbooks-bookkeepermetadata"><span class="function-name-prevnext">BookKeeper Ledger Metadata</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#java-clients">Java clients</a></li><li><a href="#python-clients">Python clients</a></li><li><a href="#c-clients">C++ clients</a></li><li><a href="#go-clients">Go clients</a></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>