blob: 3c9979d41766eaeee02a1df9717aca6b2948028f [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Pulsar binary protocol specification · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="Pulsar uses a custom binary protocol for communications between producers/consumers and brokers. This protocol is designed to support required features, such as acknowledgements and flow control, while ensuring maximum transport and implementation efficiency."/><meta name="docsearch:version" content="2.6.3"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Pulsar binary protocol specification · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="Pulsar uses a custom binary protocol for communications between producers/consumers and brokers. This protocol is designed to support required features, such as acknowledgements and flow control, while ensuring maximum transport and implementation efficiency."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.6.3</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.6.3/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.6.3/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.6.3/developing-binary-protocol">日本語</a></li><li><a href="/docs/fr/2.6.3/developing-binary-protocol">Français</a></li><li><a href="/docs/ko/2.6.3/developing-binary-protocol">한국어</a></li><li><a href="/docs/zh-CN/2.6.3/developing-binary-protocol">中文</a></li><li><a href="/docs/zh-TW/2.6.3/developing-binary-protocol">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Development</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/getting-started-helm">Run Pulsar in Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/client-libraries">Use Pulsar with client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/concepts-proxy-sni-routing">Proxy support with SNI routing</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/concepts-multiple-advertised-listeners">Multiple advertised listeners</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/io-connectors">Built-in connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/io-cdc">CDC connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/io-develop">Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/io-cli">CLI</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/sql-getting-started">Query data</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/sql-deployment-configurations">Configuration and deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/sql-rest-api">REST APIs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Kubernetes (Helm)</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/helm-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/helm-prepare">Prepare</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/helm-install">Install</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/helm-deploy">Deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/helm-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/helm-tools">Required Tools</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/administration-pulsar-manager">Pulsar Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/administration-upgrade">Upgrade</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/security-tls-keystore">Using TLS with KeyStore configure</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/security-oauth2">Authentication using OAuth 2.0 access tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/security-extending">Extending</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/security-bouncy-castle">Bouncy Castle Providers</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Performance</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/performance-pulsar-perf">Pulsar Perf</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/client-libraries-node">Node.js</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/client-libraries-websocket">WebSocket</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/client-libraries-dotnet">C#</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/admin-api-non-partitioned-topics">Non-Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/admin-api-functions">Functions</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/develop-tools">Simulation tools</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.6.3/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.3/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/developing-binary-protocol.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Pulsar binary protocol specification</h1></header><article><div><span><p>Pulsar uses a custom binary protocol for communications between producers/consumers and brokers. This protocol is designed to support required features, such as acknowledgements and flow control, while ensuring maximum transport and implementation efficiency.</p>
<p>Clients and brokers exchange <em>commands</em> with each other. Commands are formatted as binary <a href="https://developers.google.com/protocol-buffers/">protocol buffer</a> (aka <em>protobuf</em>) messages. The format of protobuf commands is specified in the <a href="https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto"><code>PulsarApi.proto</code></a> file and also documented in the <a href="#protobuf-interface">Protobuf interface</a> section below.</p>
<blockquote>
<h3><a class="anchor" aria-hidden="true" id="connection-sharing"></a><a href="#connection-sharing" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Connection sharing</h3>
<p>Commands for different producers and consumers can be interleaved and sent through the same connection without restriction.</p>
</blockquote>
<p>All commands associated with Pulsar's protocol are contained in a
<a href="#pulsar.proto.BaseCommand"><code>BaseCommand</code></a> protobuf message that includes a <a href="#pulsar.proto.Type"><code>Type</code></a> <a href="https://developers.google.com/protocol-buffers/docs/proto#enum">enum</a> with all possible subcommands as optional fields. <code>BaseCommand</code> messages can specify only one subcommand.</p>
<h2><a class="anchor" aria-hidden="true" id="framing"></a><a href="#framing" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Framing</h2>
<p>Since protobuf doesn't provide any sort of message frame, all messages in the Pulsar protocol are prepended with a 4-byte field that specifies the size of the frame. The maximum allowable size of a single frame is 5 MB.</p>
<p>The Pulsar protocol allows for two types of commands:</p>
<ol>
<li><strong>Simple commands</strong> that do not carry a message payload.</li>
<li><strong>Payload commands</strong> that bear a payload that is used when publishing or delivering messages. In payload commands, the protobuf command data is followed by protobuf <a href="#message-metadata">metadata</a> and then the payload, which is passed in raw format outside of protobuf. All sizes are passed as 4-byte unsigned big endian integers.</li>
</ol>
<blockquote>
<p>Message payloads are passed in raw format rather than protobuf format for efficiency reasons.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="simple-commands"></a><a href="#simple-commands" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Simple commands</h3>
<p>Simple (payload-free) commands have this basic structure:</p>
<table>
<thead>
<tr><th style="text-align:left">Component</th><th style="text-align:left">Description</th><th style="text-align:left">Size (in bytes)</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">totalSize</td><td style="text-align:left">The size of the frame, counting everything that comes after it (in bytes)</td><td style="text-align:left">4</td></tr>
<tr><td style="text-align:left">commandSize</td><td style="text-align:left">The size of the protobuf-serialized command</td><td style="text-align:left">4</td></tr>
<tr><td style="text-align:left">message</td><td style="text-align:left">The protobuf message serialized in a raw binary format (rather than in protobuf format)</td><td style="text-align:left"></td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="payload-commands"></a><a href="#payload-commands" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Payload commands</h3>
<p>Payload commands have this basic structure:</p>
<table>
<thead>
<tr><th style="text-align:left">Component</th><th style="text-align:left">Description</th><th style="text-align:left">Size (in bytes)</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">totalSize</td><td style="text-align:left">The size of the frame, counting everything that comes after it (in bytes)</td><td style="text-align:left">4</td></tr>
<tr><td style="text-align:left">commandSize</td><td style="text-align:left">The size of the protobuf-serialized command</td><td style="text-align:left">4</td></tr>
<tr><td style="text-align:left">message</td><td style="text-align:left">The protobuf message serialized in a raw binary format (rather than in protobuf format)</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">magicNumber</td><td style="text-align:left">A 2-byte byte array (<code>0x0e01</code>) identifying the current format</td><td style="text-align:left">2</td></tr>
<tr><td style="text-align:left">checksum</td><td style="text-align:left">A <a href="http://www.evanjones.ca/crc32c.html">CRC32-C checksum</a> of everything that comes after it</td><td style="text-align:left">4</td></tr>
<tr><td style="text-align:left">metadataSize</td><td style="text-align:left">The size of the message <a href="#message-metadata">metadata</a></td><td style="text-align:left">4</td></tr>
<tr><td style="text-align:left">metadata</td><td style="text-align:left">The message <a href="#message-metadata">metadata</a> stored as a binary protobuf message</td><td style="text-align:left"></td></tr>
<tr><td style="text-align:left">payload</td><td style="text-align:left">Anything left in the frame is considered the payload and can include any sequence of bytes</td><td style="text-align:left"></td></tr>
</tbody>
</table>
<h2><a class="anchor" aria-hidden="true" id="message-metadata"></a><a href="#message-metadata" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Message metadata</h2>
<p>Message metadata is stored alongside the application-specified payload as a serialized protobuf message. Metadata is created by the producer and passed on unchanged to the consumer.</p>
<table>
<thead>
<tr><th style="text-align:left">Field</th><th style="text-align:left">Description</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>producer_name</code></td><td style="text-align:left">The name of the producer that published the message</td></tr>
<tr><td style="text-align:left"><code>sequence_id</code></td><td style="text-align:left">The sequence ID of the message, assigned by producer</td></tr>
<tr><td style="text-align:left"><code>publish_time</code></td><td style="text-align:left">The publish timestamp in Unix time (i.e. as the number of milliseconds since January 1st, 1970 in UTC)</td></tr>
<tr><td style="text-align:left"><code>properties</code></td><td style="text-align:left">A sequence of key/value pairs (using the <a href="https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto#L32"><code>KeyValue</code></a> message). These are application-defined keys and values with no special meaning to Pulsar.</td></tr>
<tr><td style="text-align:left"><code>replicated_from</code> <em>(optional)</em></td><td style="text-align:left">Indicates that the message has been replicated and specifies the name of the <a href="/docs/en/2.6.3/reference-terminology#cluster">cluster</a> where the message was originally published</td></tr>
<tr><td style="text-align:left"><code>partition_key</code> <em>(optional)</em></td><td style="text-align:left">While publishing on a partition topic, if the key is present, the hash of the key is used to determine which partition to choose</td></tr>
<tr><td style="text-align:left"><code>compression</code> <em>(optional)</em></td><td style="text-align:left">Signals that payload has been compressed and with which compression library</td></tr>
<tr><td style="text-align:left"><code>uncompressed_size</code> <em>(optional)</em></td><td style="text-align:left">If compression is used, the producer must fill the uncompressed size field with the original payload size</td></tr>
<tr><td style="text-align:left"><code>num_messages_in_batch</code> <em>(optional)</em></td><td style="text-align:left">If this message is really a <a href="#batch-messages">batch</a> of multiple entries, this field must be set to the number of messages in the batch</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="batch-messages"></a><a href="#batch-messages" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Batch messages</h3>
<p>When using batch messages, the payload will be containing a list of entries,
each of them with its individual metadata, defined by the <code>SingleMessageMetadata</code>
object.</p>
<p>For a single batch, the payload format will look like this:</p>
<table>
<thead>
<tr><th style="text-align:left">Field</th><th style="text-align:left">Description</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">metadataSizeN</td><td style="text-align:left">The size of the single message metadata serialized Protobuf</td></tr>
<tr><td style="text-align:left">metadataN</td><td style="text-align:left">Single message metadata</td></tr>
<tr><td style="text-align:left">payloadN</td><td style="text-align:left">Message payload passed by application</td></tr>
</tbody>
</table>
<p>Each metadata field looks like this;</p>
<table>
<thead>
<tr><th style="text-align:left">Field</th><th style="text-align:left">Description</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">properties</td><td style="text-align:left">Application-defined properties</td></tr>
<tr><td style="text-align:left">partition key <em>(optional)</em></td><td style="text-align:left">Key to indicate the hashing to a particular partition</td></tr>
<tr><td style="text-align:left">payload_size</td><td style="text-align:left">Size of the payload for the single message in the batch</td></tr>
</tbody>
</table>
<p>When compression is enabled, the whole batch will be compressed at once.</p>
<h2><a class="anchor" aria-hidden="true" id="interactions"></a><a href="#interactions" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Interactions</h2>
<h3><a class="anchor" aria-hidden="true" id="connection-establishment"></a><a href="#connection-establishment" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Connection establishment</h3>
<p>After opening a TCP connection to a broker, typically on port 6650, the client
is responsible to initiate the session.</p>
<p><img src="/docs/assets/binary-protocol-connect.png" alt="Connect interaction"></p>
<p>After receiving a <code>Connected</code> response from the broker, the client can
consider the connection ready to use. Alternatively, if the broker doesn't
validate the client authentication, it will reply with an <code>Error</code> command and
close the TCP connection.</p>
<p>Example:</p>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandConnect</span> </span>{
<span class="hljs-string">"client_version"</span> : <span class="hljs-string">"Pulsar-Client-Java-v1.15.2"</span>,
<span class="hljs-string">"auth_method_name"</span> : <span class="hljs-string">"my-authentication-plugin"</span>,
<span class="hljs-string">"auth_data"</span> : <span class="hljs-string">"my-auth-data"</span>,
<span class="hljs-string">"protocol_version"</span> : <span class="hljs-number">6</span>
}
</code></pre>
<p>Fields:</p>
<ul>
<li><code>client_version</code> → String based identifier. Format is not enforced</li>
<li><code>auth_method_name</code><em>(optional)</em> Name of the authentication plugin if auth
enabled</li>
<li><code>auth_data</code><em>(optional)</em> Plugin specific authentication data</li>
<li><code>protocol_version</code> → Indicates the protocol version supported by the
client. Broker will not send commands introduced in newer revisions of the
protocol. Broker might be enforcing a minimum version</li>
</ul>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandConnected</span> </span>{
<span class="hljs-string">"server_version"</span> : <span class="hljs-string">"Pulsar-Broker-v1.15.2"</span>,
<span class="hljs-string">"protocol_version"</span> : <span class="hljs-number">6</span>
}
</code></pre>
<p>Fields:</p>
<ul>
<li><code>server_version</code> → String identifier of broker version</li>
<li><code>protocol_version</code> → Protocol version supported by the broker. Client
must not attempt to send commands introduced in newer revisions of the
protocol</li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="keep-alive"></a><a href="#keep-alive" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Keep Alive</h3>
<p>To identify prolonged network partitions between clients and brokers or cases
in which a machine crashes without interrupting the TCP connection on the remote
end (eg: power outage, kernel panic, hard reboot...), we have introduced a
mechanism to probe for the availability status of the remote peer.</p>
<p>Both clients and brokers are sending <code>Ping</code> commands periodically and they will
close the socket if a <code>Pong</code> response is not received within a timeout (default
used by broker is 60s).</p>
<p>A valid implementation of a Pulsar client is not required to send the <code>Ping</code>
probe, though it is required to promptly reply after receiving one from the
broker in order to prevent the remote side from forcibly closing the TCP connection.</p>
<h3><a class="anchor" aria-hidden="true" id="producer"></a><a href="#producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producer</h3>
<p>In order to send messages, a client needs to establish a producer. When creating
a producer, the broker will first verify that this particular client is
authorized to publish on the topic.</p>
<p>Once the client gets confirmation of the producer creation, it can publish
messages to the broker, referring to the producer id negotiated before.</p>
<p><img src="/docs/assets/binary-protocol-producer.png" alt="Producer interaction"></p>
<h5><a class="anchor" aria-hidden="true" id="command-producer"></a><a href="#command-producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command Producer</h5>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandProducer</span> </span>{
<span class="hljs-string">"topic"</span> : <span class="hljs-string">"persistent://my-property/my-cluster/my-namespace/my-topic"</span>,
<span class="hljs-string">"producer_id"</span> : <span class="hljs-number">1</span>,
<span class="hljs-string">"request_id"</span> : <span class="hljs-number">1</span>
}
</code></pre>
<p>Parameters:</p>
<ul>
<li><code>topic</code> → Complete topic name to where you want to create the producer on</li>
<li><code>producer_id</code> → Client generated producer identifier. Needs to be unique
within the same connection</li>
<li><code>request_id</code> → Identifier for this request. Used to match the response with
the originating request. Needs to be unique within the same connection</li>
<li><code>producer_name</code><em>(optional)</em> If a producer name is specified, the name will
be used, otherwise the broker will generate a unique name. Generated
producer name is guaranteed to be globally unique. Implementations are
expected to let the broker generate a new producer name when the producer
is initially created, then reuse it when recreating the producer after
reconnections.</li>
</ul>
<p>The broker will reply with either <code>ProducerSuccess</code> or <code>Error</code> commands.</p>
<h5><a class="anchor" aria-hidden="true" id="command-producersuccess"></a><a href="#command-producersuccess" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command ProducerSuccess</h5>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandProducerSuccess</span> </span>{
<span class="hljs-string">"request_id"</span> : <span class="hljs-number">1</span>,
<span class="hljs-string">"producer_name"</span> : <span class="hljs-string">"generated-unique-producer-name"</span>
}
</code></pre>
<p>Parameters:</p>
<ul>
<li><code>request_id</code> → Original id of the <code>CreateProducer</code> request</li>
<li><code>producer_name</code> → Generated globally unique producer name or the name
specified by the client, if any.</li>
</ul>
<h5><a class="anchor" aria-hidden="true" id="command-send"></a><a href="#command-send" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command Send</h5>
<p>Command <code>Send</code> is used to publish a new message within the context of an
already existing producer. This command is used in a frame that includes command
as well as message payload, for which the complete format is specified in the
<a href="#payload-commands">payload commands</a> section.</p>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandSend</span> </span>{
<span class="hljs-string">"producer_id"</span> : <span class="hljs-number">1</span>,
<span class="hljs-string">"sequence_id"</span> : <span class="hljs-number">0</span>,
<span class="hljs-string">"num_messages"</span> : <span class="hljs-number">1</span>
}
</code></pre>
<p>Parameters:</p>
<ul>
<li><code>producer_id</code> → id of an existing producer</li>
<li><code>sequence_id</code> → each message has an associated sequence id which is expected
to be implemented with a counter starting at 0. The <code>SendReceipt</code> that
acknowledges the effective publishing of a messages will refer to it by
its sequence id.</li>
<li><code>num_messages</code><em>(optional)</em> Used when publishing a batch of messages at
once.</li>
</ul>
<h5><a class="anchor" aria-hidden="true" id="command-sendreceipt"></a><a href="#command-sendreceipt" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command SendReceipt</h5>
<p>After a message has been persisted on the configured number of replicas, the
broker will send the acknowledgment receipt to the producer.</p>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandSendReceipt</span> </span>{
<span class="hljs-string">"producer_id"</span> : <span class="hljs-number">1</span>,
<span class="hljs-string">"sequence_id"</span> : <span class="hljs-number">0</span>,
<span class="hljs-string">"message_id"</span> : {
<span class="hljs-string">"ledgerId"</span> : <span class="hljs-number">123</span>,
<span class="hljs-string">"entryId"</span> : <span class="hljs-number">456</span>
}
}
</code></pre>
<p>Parameters:</p>
<ul>
<li><code>producer_id</code> → id of producer originating the send request</li>
<li><code>sequence_id</code> → sequence id of the published message</li>
<li><code>message_id</code> → message id assigned by the system to the published message
Unique within a single cluster. Message id is composed of 2 longs, <code>ledgerId</code>
and <code>entryId</code>, that reflect that this unique id is assigned when appending
to a BookKeeper ledger</li>
</ul>
<h5><a class="anchor" aria-hidden="true" id="command-closeproducer"></a><a href="#command-closeproducer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command CloseProducer</h5>
<p><strong>Note</strong>: <em>This command can be sent by either producer or broker</em>.</p>
<p>When receiving a <code>CloseProducer</code> command, the broker will stop accepting any
more messages for the producer, wait until all pending messages are persisted
and then reply <code>Success</code> to the client.</p>
<p>The broker can send a <code>CloseProducer</code> command to client when it's performing
a graceful failover (eg: broker is being restarted, or the topic is being unloaded
by load balancer to be transferred to a different broker).</p>
<p>When receiving the <code>CloseProducer</code>, the client is expected to go through the
service discovery lookup again and recreate the producer again. The TCP
connection is not affected.</p>
<h3><a class="anchor" aria-hidden="true" id="consumer"></a><a href="#consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumer</h3>
<p>A consumer is used to attach to a subscription and consume messages from it.
After every reconnection, a client needs to subscribe to the topic. If a
subscription is not already there, a new one will be created.</p>
<p><img src="/docs/assets/binary-protocol-consumer.png" alt="Consumer"></p>
<h4><a class="anchor" aria-hidden="true" id="flow-control"></a><a href="#flow-control" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Flow control</h4>
<p>After the consumer is ready, the client needs to <em>give permission</em> to the
broker to push messages. This is done with the <code>Flow</code> command.</p>
<p>A <code>Flow</code> command gives additional <em>permits</em> to send messages to the consumer.
A typical consumer implementation will use a queue to accumulate these messages
before the application is ready to consume them.</p>
<p>After the application has dequeued half of the messages in the queue, the consumer
sends permits to the broker to ask for more messages (equals to half of the messages in the queue).</p>
<p>For example, if the queue size is 1000 and the consumer consumes 500 messages in the queue.
Then the consumer sends permits to the broker to ask for 500 messages.</p>
<h5><a class="anchor" aria-hidden="true" id="command-subscribe"></a><a href="#command-subscribe" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command Subscribe</h5>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandSubscribe</span> </span>{
<span class="hljs-string">"topic"</span> : <span class="hljs-string">"persistent://my-property/my-cluster/my-namespace/my-topic"</span>,
<span class="hljs-string">"subscription"</span> : <span class="hljs-string">"my-subscription-name"</span>,
<span class="hljs-string">"subType"</span> : <span class="hljs-string">"Exclusive"</span>,
<span class="hljs-string">"consumer_id"</span> : <span class="hljs-number">1</span>,
<span class="hljs-string">"request_id"</span> : <span class="hljs-number">1</span>
}
</code></pre>
<p>Parameters:</p>
<ul>
<li><code>topic</code> → Complete topic name to where you want to create the consumer on</li>
<li><code>subscription</code> → Subscription name</li>
<li><code>subType</code> → Subscription type: Exclusive, Shared, Failover, Key_Shared</li>
<li><code>consumer_id</code> → Client generated consumer identifier. Needs to be unique
within the same connection</li>
<li><code>request_id</code> → Identifier for this request. Used to match the response with
the originating request. Needs to be unique within the same connection</li>
<li><code>consumer_name</code><em>(optional)</em> Clients can specify a consumer name. This
name can be used to track a particular consumer in the stats. Also, in
Failover subscription type, the name is used to decide which consumer is
elected as <em>master</em> (the one receiving messages): consumers are sorted by
their consumer name and the first one is elected master.</li>
</ul>
<h5><a class="anchor" aria-hidden="true" id="command-flow"></a><a href="#command-flow" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command Flow</h5>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandFlow</span> </span>{
<span class="hljs-string">"consumer_id"</span> : <span class="hljs-number">1</span>,
<span class="hljs-string">"messagePermits"</span> : <span class="hljs-number">1000</span>
}
</code></pre>
<p>Parameters:</p>
<ul>
<li><code>consumer_id</code> → Id of an already established consumer</li>
<li><code>messagePermits</code> → Number of additional permits to grant to the broker for
pushing more messages</li>
</ul>
<h5><a class="anchor" aria-hidden="true" id="command-message"></a><a href="#command-message" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command Message</h5>
<p>Command <code>Message</code> is used by the broker to push messages to an existing consumer,
within the limits of the given permits.</p>
<p>This command is used in a frame that includes the message payload as well, for
which the complete format is specified in the <a href="#payload-commands">payload commands</a>
section.</p>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandMessage</span> </span>{
<span class="hljs-string">"consumer_id"</span> : <span class="hljs-number">1</span>,
<span class="hljs-string">"message_id"</span> : {
<span class="hljs-string">"ledgerId"</span> : <span class="hljs-number">123</span>,
<span class="hljs-string">"entryId"</span> : <span class="hljs-number">456</span>
}
}
</code></pre>
<h5><a class="anchor" aria-hidden="true" id="command-ack"></a><a href="#command-ack" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command Ack</h5>
<p>An <code>Ack</code> is used to signal to the broker that a given message has been
successfully processed by the application and can be discarded by the broker.</p>
<p>In addition, the broker will also maintain the consumer position based on the
acknowledged messages.</p>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandAck</span> </span>{
<span class="hljs-string">"consumer_id"</span> : <span class="hljs-number">1</span>,
<span class="hljs-string">"ack_type"</span> : <span class="hljs-string">"Individual"</span>,
<span class="hljs-string">"message_id"</span> : {
<span class="hljs-string">"ledgerId"</span> : <span class="hljs-number">123</span>,
<span class="hljs-string">"entryId"</span> : <span class="hljs-number">456</span>
}
}
</code></pre>
<p>Parameters:</p>
<ul>
<li><code>consumer_id</code> → Id of an already established consumer</li>
<li><code>ack_type</code> → Type of acknowledgment: <code>Individual</code> or <code>Cumulative</code></li>
<li><code>message_id</code> → Id of the message to acknowledge</li>
<li><code>validation_error</code><em>(optional)</em> Indicates that the consumer has discarded
the messages due to: <code>UncompressedSizeCorruption</code>,
<code>DecompressionError</code>, <code>ChecksumMismatch</code>, <code>BatchDeSerializeError</code></li>
</ul>
<h5><a class="anchor" aria-hidden="true" id="command-closeconsumer"></a><a href="#command-closeconsumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command CloseConsumer</h5>
<p><strong><em>Note</em></strong>: <em>This command can be sent by either producer or broker</em>.</p>
<p>This command behaves the same as <a href="#command-closeproducer"><code>CloseProducer</code></a></p>
<h5><a class="anchor" aria-hidden="true" id="command-redeliverunacknowledgedmessages"></a><a href="#command-redeliverunacknowledgedmessages" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command RedeliverUnacknowledgedMessages</h5>
<p>A consumer can ask the broker to redeliver some or all of the pending messages
that were pushed to that particular consumer and not yet acknowledged.</p>
<p>The protobuf object accepts a list of message ids that the consumer wants to
be redelivered. If the list is empty, the broker will redeliver all the
pending messages.</p>
<p>On redelivery, messages can be sent to the same consumer or, in the case of a
shared subscription, spread across all available consumers.</p>
<h5><a class="anchor" aria-hidden="true" id="command-reachedendoftopic"></a><a href="#command-reachedendoftopic" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command ReachedEndOfTopic</h5>
<p>This is sent by a broker to a particular consumer, whenever the topic
has been &quot;terminated&quot; and all the messages on the subscription were
acknowledged.</p>
<p>The client should use this command to notify the application that no more
messages are coming from the consumer.</p>
<h5><a class="anchor" aria-hidden="true" id="command-consumerstats"></a><a href="#command-consumerstats" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command ConsumerStats</h5>
<p>This command is sent by the client to retrieve Subscriber and Consumer level
stats from the broker.
Parameters:</p>
<ul>
<li><code>request_id</code> → Id of the request, used to correlate the request
and the response.</li>
<li><code>consumer_id</code> → Id of an already established consumer.</li>
</ul>
<h5><a class="anchor" aria-hidden="true" id="command-consumerstatsresponse"></a><a href="#command-consumerstatsresponse" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command ConsumerStatsResponse</h5>
<p>This is the broker's response to ConsumerStats request by the client.
It contains the Subscriber and Consumer level stats of the <code>consumer_id</code> sent in the request.
If the <code>error_code</code> or the <code>error_message</code> field is set it indicates that the request has failed.</p>
<h5><a class="anchor" aria-hidden="true" id="command-unsubscribe"></a><a href="#command-unsubscribe" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command Unsubscribe</h5>
<p>This command is sent by the client to unsubscribe the <code>consumer_id</code> from the associated topic.
Parameters:</p>
<ul>
<li><code>request_id</code> → Id of the request.</li>
<li><code>consumer_id</code> → Id of an already established consumer which needs to unsubscribe.</li>
</ul>
<h2><a class="anchor" aria-hidden="true" id="service-discovery"></a><a href="#service-discovery" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Service discovery</h2>
<h3><a class="anchor" aria-hidden="true" id="topic-lookup"></a><a href="#topic-lookup" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Topic lookup</h3>
<p>Topic lookup needs to be performed each time a client needs to create or
reconnect a producer or a consumer. Lookup is used to discover which particular
broker is serving the topic we are about to use.</p>
<p>Lookup can be done with a REST call as described in the
<a href="/docs/en/2.6.3/admin-api-persistent-topics#lookup-of-topic">admin API</a>
docs.</p>
<p>Since Pulsar-1.16 it is also possible to perform the lookup within the binary
protocol.</p>
<p>For the sake of example, let's assume we have a service discovery component
running at <code>pulsar://broker.example.com:6650</code></p>
<p>Individual brokers will be running at <code>pulsar://broker-1.example.com:6650</code>,
<code>pulsar://broker-2.example.com:6650</code>, ...</p>
<p>A client can use a connection to the discovery service host to issue a
<code>LookupTopic</code> command. The response can either be a broker hostname to
connect to, or a broker hostname to which retry the lookup.</p>
<p>The <code>LookupTopic</code> command has to be used in a connection that has already
gone through the <code>Connect</code> / <code>Connected</code> initial handshake.</p>
<p><img src="/docs/assets/binary-protocol-topic-lookup.png" alt="Topic lookup"></p>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandLookupTopic</span> </span>{
<span class="hljs-string">"topic"</span> : <span class="hljs-string">"persistent://my-property/my-cluster/my-namespace/my-topic"</span>,
<span class="hljs-string">"request_id"</span> : <span class="hljs-number">1</span>,
<span class="hljs-string">"authoritative"</span> : <span class="hljs-literal">false</span>
}
</code></pre>
<p>Fields:</p>
<ul>
<li><code>topic</code> → Topic name to lookup</li>
<li><code>request_id</code> → Id of the request that will be passed with its response</li>
<li><code>authoritative</code> → Initial lookup request should use false. When following a
redirect response, client should pass the same value contained in the
response</li>
</ul>
<h5><a class="anchor" aria-hidden="true" id="lookuptopicresponse"></a><a href="#lookuptopicresponse" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>LookupTopicResponse</h5>
<p>Example of response with successful lookup:</p>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandLookupTopicResponse</span> </span>{
<span class="hljs-string">"request_id"</span> : <span class="hljs-number">1</span>,
<span class="hljs-string">"response"</span> : <span class="hljs-string">"Connect"</span>,
<span class="hljs-string">"brokerServiceUrl"</span> : <span class="hljs-string">"pulsar://broker-1.example.com:6650"</span>,
<span class="hljs-string">"brokerServiceUrlTls"</span> : <span class="hljs-string">"pulsar+ssl://broker-1.example.com:6651"</span>,
<span class="hljs-string">"authoritative"</span> : <span class="hljs-literal">true</span>
}
</code></pre>
<p>Example of lookup response with redirection:</p>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandLookupTopicResponse</span> </span>{
<span class="hljs-string">"request_id"</span> : <span class="hljs-number">1</span>,
<span class="hljs-string">"response"</span> : <span class="hljs-string">"Redirect"</span>,
<span class="hljs-string">"brokerServiceUrl"</span> : <span class="hljs-string">"pulsar://broker-2.example.com:6650"</span>,
<span class="hljs-string">"brokerServiceUrlTls"</span> : <span class="hljs-string">"pulsar+ssl://broker-2.example.com:6651"</span>,
<span class="hljs-string">"authoritative"</span> : <span class="hljs-literal">true</span>
}
</code></pre>
<p>In this second case, we need to reissue the <code>LookupTopic</code> command request
to <code>broker-2.example.com</code> and this broker will be able to give a definitive
answer to the lookup request.</p>
<h3><a class="anchor" aria-hidden="true" id="partitioned-topics-discovery"></a><a href="#partitioned-topics-discovery" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Partitioned topics discovery</h3>
<p>Partitioned topics metadata discovery is used to find out if a topic is a
&quot;partitioned topic&quot; and how many partitions were set up.</p>
<p>If the topic is marked as &quot;partitioned&quot;, the client is expected to create
multiple producers or consumers, one for each partition, using the <code>partition-X</code>
suffix.</p>
<p>This information only needs to be retrieved the first time a producer or
consumer is created. There is no need to do this after reconnections.</p>
<p>The discovery of partitioned topics metadata works very similar to the topic
lookup. The client send a request to the service discovery address and the
response will contain actual metadata.</p>
<h5><a class="anchor" aria-hidden="true" id="command-partitionedtopicmetadata"></a><a href="#command-partitionedtopicmetadata" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command PartitionedTopicMetadata</h5>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandPartitionedTopicMetadata</span> </span>{
<span class="hljs-string">"topic"</span> : <span class="hljs-string">"persistent://my-property/my-cluster/my-namespace/my-topic"</span>,
<span class="hljs-string">"request_id"</span> : <span class="hljs-number">1</span>
}
</code></pre>
<p>Fields:</p>
<ul>
<li><code>topic</code> → the topic for which to check the partitions metadata</li>
<li><code>request_id</code> → Id of the request that will be passed with its response</li>
</ul>
<h5><a class="anchor" aria-hidden="true" id="command-partitionedtopicmetadataresponse"></a><a href="#command-partitionedtopicmetadataresponse" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Command PartitionedTopicMetadataResponse</h5>
<p>Example of response with metadata:</p>
<pre><code class="hljs css language-protobuf"><span class="hljs-class"><span class="hljs-keyword">message</span> <span class="hljs-title">CommandPartitionedTopicMetadataResponse</span> </span>{
<span class="hljs-string">"request_id"</span> : <span class="hljs-number">1</span>,
<span class="hljs-string">"response"</span> : <span class="hljs-string">"Success"</span>,
<span class="hljs-string">"partitions"</span> : <span class="hljs-number">32</span>
}
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="protobuf-interface"></a><a href="#protobuf-interface" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Protobuf interface</h2>
<p>All Pulsar's Protobuf definitions can be found <a href="https://github.com/apache/pulsar/tree/master//pulsar-common/src/main/proto/PulsarApi.proto">here</a>
.</p>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.6.3/develop-tools"><span class="arrow-prev"></span><span>Simulation tools</span></a><a class="docs-next button" href="/docs/en/2.6.3/develop-schema"><span>Next</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#framing">Framing</a><ul class="toc-headings"><li><a href="#simple-commands">Simple commands</a></li><li><a href="#payload-commands">Payload commands</a></li></ul></li><li><a href="#message-metadata">Message metadata</a><ul class="toc-headings"><li><a href="#batch-messages">Batch messages</a></li></ul></li><li><a href="#interactions">Interactions</a><ul class="toc-headings"><li><a href="#connection-establishment">Connection establishment</a></li><li><a href="#keep-alive">Keep Alive</a></li><li><a href="#producer">Producer</a></li><li><a href="#consumer">Consumer</a></li></ul></li><li><a href="#service-discovery">Service discovery</a><ul class="toc-headings"><li><a href="#topic-lookup">Topic lookup</a></li><li><a href="#partitioned-topics-discovery">Partitioned topics discovery</a></li></ul></li><li><a href="#protobuf-interface">Protobuf interface</a></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>