blob: 7773c9e0143c5c21ee33ce5a3e18a6b56d4c2159 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Pulsar&#x27;s WebSocket API · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="Pulsar&#x27;s [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) API is meant to provide a simple way to interact with Pulsar using languages that do not have an official [client library](/docs/en/2.6.0/client-libraries). Through WebSockets you can publish and consume messages and use all the features available in the [Java](/docs/en/2.6.0/client-libraries-java), [Go](/docs/en/2.6.0/client-libraries-go), [Python](/docs/en/2.6.0/client-libraries-python) and [C++](/docs/en/2.6.0/client-libraries-cpp) client libraries."/><meta name="docsearch:version" content="2.6.0"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Pulsar&#x27;s WebSocket API · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="Pulsar&#x27;s [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) API is meant to provide a simple way to interact with Pulsar using languages that do not have an official [client library](/docs/en/2.6.0/client-libraries). Through WebSockets you can publish and consume messages and use all the features available in the [Java](/docs/en/2.6.0/client-libraries-java), [Go](/docs/en/2.6.0/client-libraries-go), [Python](/docs/en/2.6.0/client-libraries-python) and [C++](/docs/en/2.6.0/client-libraries-cpp) client libraries."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.6.0</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.6.0/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.6.0/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.6.0/client-libraries-websocket">日本語</a></li><li><a href="/docs/fr/2.6.0/client-libraries-websocket">Français</a></li><li><a href="/docs/ko/2.6.0/client-libraries-websocket">한국어</a></li><li><a href="/docs/zh-CN/2.6.0/client-libraries-websocket">中文</a></li><li><a href="/docs/zh-TW/2.6.0/client-libraries-websocket">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Client Libraries</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/getting-started-helm">Run Pulsar in Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/client-libraries">Use Pulsar with client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/concepts-multiple-advertised-listeners">Multiple advertised listeners</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/io-connectors">Built-in connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/io-cdc">CDC connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/io-develop">Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/io-cli">CLI</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/sql-getting-started">Query data</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/sql-deployment-configurations">Configuration and deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/sql-rest-api">REST APIs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Kubernetes (Helm)</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/helm-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/helm-prepare">Prepare</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/helm-install">Install</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/helm-deploy">Deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/helm-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/helm-tools">Required Tools</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/administration-pulsar-manager">Pulsar Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/administration-upgrade">Upgrade</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/security-tls-keystore">Using TLS with KeyStore configure</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/security-extending">Extending</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/security-bouncy-castle">Bouncy Castle Providers</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Performance</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/performance-pulsar-perf">Pulsar Perf</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/client-libraries-node">Node.js</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.6.0/client-libraries-websocket">WebSocket</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/client-libraries-dotnet">C#</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/admin-api-non-partitioned-topics">Non-Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/admin-api-functions">Functions</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.6.0/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/client-libraries-websocket.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Pulsar&#x27;s WebSocket API</h1></header><article><div><span><p>Pulsar's <a href="https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API">WebSocket</a> API is meant to provide a simple way to interact with Pulsar using languages that do not have an official <a href="/docs/en/2.6.0/client-libraries">client library</a>. Through WebSockets you can publish and consume messages and use all the features available in the <a href="/docs/en/2.6.0/client-libraries-java">Java</a>, <a href="/docs/en/2.6.0/client-libraries-go">Go</a>, <a href="/docs/en/2.6.0/client-libraries-python">Python</a> and <a href="/docs/en/2.6.0/client-libraries-cpp">C++</a> client libraries.</p>
<blockquote>
<p>You can use Pulsar's WebSocket API with any WebSocket client library. See examples for Python and Node.js <a href="#client-examples">below</a>.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="running-the-websocket-service"></a><a href="#running-the-websocket-service" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Running the WebSocket service</h2>
<p>The standalone variant of Pulsar that we recommend using for <a href="/docs/en/2.6.0/getting-started-standalone">local development</a> already has the WebSocket service enabled.</p>
<p>In non-standalone mode, there are two ways to deploy the WebSocket service:</p>
<ul>
<li><a href="#embedded-with-a-pulsar-broker">embedded</a> with a Pulsar broker</li>
<li>as a <a href="#as-a-separate-component">separate component</a></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="embedded-with-a-pulsar-broker"></a><a href="#embedded-with-a-pulsar-broker" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Embedded with a Pulsar broker</h3>
<p>In this mode, the WebSocket service will run within the same HTTP service that's already running in the broker. To enable this mode, set the <a href="/docs/en/2.6.0/reference-configuration#broker-webSocketServiceEnabled"><code>webSocketServiceEnabled</code></a> parameter in the <a href="/docs/en/2.6.0/reference-configuration#broker"><code>conf/broker.conf</code></a> configuration file in your installation.</p>
<pre><code class="hljs css language-properties"><span class="hljs-attr">webSocketServiceEnabled</span>=<span class="hljs-string">true</span>
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="as-a-separate-component"></a><a href="#as-a-separate-component" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>As a separate component</h3>
<p>In this mode, the WebSocket service will be run from a Pulsar <a href="/docs/en/2.6.0/reference-terminology#broker">broker</a> as a separate service. Configuration for this mode is handled in the <a href="/docs/en/2.6.0/reference-configuration#websocket"><code>conf/websocket.conf</code></a> configuration file. You'll need to set <em>at least</em> the following parameters:</p>
<ul>
<li><a href="/docs/en/2.6.0/reference-configuration#websocket-configurationStoreServers"><code>configurationStoreServers</code></a></li>
<li><a href="/docs/en/2.6.0/reference-configuration#websocket-webServicePort"><code>webServicePort</code></a></li>
<li><a href="/docs/en/2.6.0/reference-configuration#websocket-clusterName"><code>clusterName</code></a></li>
</ul>
<p>Here's an example:</p>
<pre><code class="hljs css language-properties"><span class="hljs-attr">configurationStoreServers</span>=<span class="hljs-string">zk1:2181,zk2:2181,zk3:2181</span>
<span class="hljs-attr">webServicePort</span>=<span class="hljs-string">8080</span>
<span class="hljs-attr">clusterName</span>=<span class="hljs-string">my-cluster</span>
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="starting-the-broker"></a><a href="#starting-the-broker" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Starting the broker</h3>
<p>When the configuration is set, you can start the service using the <a href="/docs/en/2.6.0/reference-cli-tools#pulsar-daemon"><code>pulsar-daemon</code></a> tool:</p>
<pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> bin/pulsar-daemon start websocket</span>
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="api-reference"></a><a href="#api-reference" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>API Reference</h2>
<p>Pulsar's WebSocket API offers three endpoints for <a href="#producer-endpoint">producing</a> messages, <a href="#consumer-endpoint">consuming</a> messages and <a href="#reader-endpoint">reading</a> messages.</p>
<p>All exchanges via the WebSocket API use JSON.</p>
<h3><a class="anchor" aria-hidden="true" id="producer-endpoint"></a><a href="#producer-endpoint" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producer endpoint</h3>
<p>The producer endpoint requires you to specify a tenant, namespace, and topic in the URL:</p>
<pre><code class="hljs css language-http">ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic
</code></pre>
<h5><a class="anchor" aria-hidden="true" id="query-param"></a><a href="#query-param" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Query param</h5>
<table>
<thead>
<tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>sendTimeoutMillis</code></td><td style="text-align:left">long</td><td style="text-align:left">no</td><td style="text-align:left">Send timeout (default: 30 secs)</td></tr>
<tr><td style="text-align:left"><code>batchingEnabled</code></td><td style="text-align:left">boolean</td><td style="text-align:left">no</td><td style="text-align:left">Enable batching of messages (default: false)</td></tr>
<tr><td style="text-align:left"><code>batchingMaxMessages</code></td><td style="text-align:left">int</td><td style="text-align:left">no</td><td style="text-align:left">Maximum number of messages permitted in a batch (default: 1000)</td></tr>
<tr><td style="text-align:left"><code>maxPendingMessages</code></td><td style="text-align:left">int</td><td style="text-align:left">no</td><td style="text-align:left">Set the max size of the internal-queue holding the messages (default: 1000)</td></tr>
<tr><td style="text-align:left"><code>batchingMaxPublishDelay</code></td><td style="text-align:left">long</td><td style="text-align:left">no</td><td style="text-align:left">Time period within which the messages will be batched (default: 10ms)</td></tr>
<tr><td style="text-align:left"><code>messageRoutingMode</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Message <a href="https://pulsar.apache.org/api/client/2.6.0-SNAPSHOT/index.html?org/apache/pulsar/client/api/ProducerConfiguration.MessageRoutingMode.html">routing mode</a> for the partitioned producer: <code>SinglePartition</code>, <code>RoundRobinPartition</code></td></tr>
<tr><td style="text-align:left"><code>compressionType</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Compression <a href="https://pulsar.apache.org/api/client/2.6.0-SNAPSHOT/index.html?org/apache/pulsar/client/api/CompressionType.html">type</a>: <code>LZ4</code>, <code>ZLIB</code></td></tr>
<tr><td style="text-align:left"><code>producerName</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Specify the name for the producer. Pulsar will enforce only one producer with same name can be publishing on a topic</td></tr>
<tr><td style="text-align:left"><code>initialSequenceId</code></td><td style="text-align:left">long</td><td style="text-align:left">no</td><td style="text-align:left">Set the baseline for the sequence ids for messages published by the producer.</td></tr>
<tr><td style="text-align:left"><code>hashingScheme</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.6.0-SNAPSHOT/org/apache/pulsar/client/api/ProducerConfiguration.HashingScheme.html">Hashing function</a> to use when publishing on a partitioned topic: <code>JavaStringHash</code>, <code>Murmur3_32Hash</code></td></tr>
</tbody>
</table>
<h4><a class="anchor" aria-hidden="true" id="publishing-a-message"></a><a href="#publishing-a-message" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Publishing a message</h4>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"payload"</span>: <span class="hljs-string">"SGVsbG8gV29ybGQ="</span>,
<span class="hljs-attr">"properties"</span>: {<span class="hljs-attr">"key1"</span>: <span class="hljs-string">"value1"</span>, <span class="hljs-attr">"key2"</span>: <span class="hljs-string">"value2"</span>},
<span class="hljs-attr">"context"</span>: <span class="hljs-string">"1"</span>
}
</code></pre>
<table>
<thead>
<tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>payload</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Base-64 encoded payload</td></tr>
<tr><td style="text-align:left"><code>properties</code></td><td style="text-align:left">key-value pairs</td><td style="text-align:left">no</td><td style="text-align:left">Application-defined properties</td></tr>
<tr><td style="text-align:left"><code>context</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Application-defined request identifier</td></tr>
<tr><td style="text-align:left"><code>key</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">For partitioned topics, decides which partition to use</td></tr>
<tr><td style="text-align:left"><code>replicationClusters</code></td><td style="text-align:left">array</td><td style="text-align:left">no</td><td style="text-align:left">Restrict replication to this list of <a href="/docs/en/2.6.0/reference-terminology#cluster">clusters</a>, specified by name</td></tr>
</tbody>
</table>
<h5><a class="anchor" aria-hidden="true" id="example-success-response"></a><a href="#example-success-response" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example success response</h5>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"result"</span>: <span class="hljs-string">"ok"</span>,
<span class="hljs-attr">"messageId"</span>: <span class="hljs-string">"CAAQAw=="</span>,
<span class="hljs-attr">"context"</span>: <span class="hljs-string">"1"</span>
}
</code></pre>
<h5><a class="anchor" aria-hidden="true" id="example-failure-response"></a><a href="#example-failure-response" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example failure response</h5>
<pre><code class="hljs css language-json"> {
<span class="hljs-attr">"result"</span>: <span class="hljs-string">"send-error:3"</span>,
<span class="hljs-attr">"errorMsg"</span>: <span class="hljs-string">"Failed to de-serialize from JSON"</span>,
<span class="hljs-attr">"context"</span>: <span class="hljs-string">"1"</span>
}
</code></pre>
<table>
<thead>
<tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>result</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left"><code>ok</code> if successful or an error message if unsuccessful</td></tr>
<tr><td style="text-align:left"><code>messageId</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Message ID assigned to the published message</td></tr>
<tr><td style="text-align:left"><code>context</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Application-defined request identifier</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="consumer-endpoint"></a><a href="#consumer-endpoint" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumer endpoint</h3>
<p>The consumer endpoint requires you to specify a tenant, namespace, and topic, as well as a subscription, in the URL:</p>
<pre><code class="hljs css language-http">ws://broker-service-url:8080/ws/v2/consumer/persistent/:tenant/:namespace/:topic/:subscription
</code></pre>
<h5><a class="anchor" aria-hidden="true" id="query-param-1"></a><a href="#query-param-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Query param</h5>
<table>
<thead>
<tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>ackTimeoutMillis</code></td><td style="text-align:left">long</td><td style="text-align:left">no</td><td style="text-align:left">Set the timeout for unacked messages (default: 0)</td></tr>
<tr><td style="text-align:left"><code>subscriptionType</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left"><a href="https://pulsar.apache.org/api/client/2.6.0-SNAPSHOT/index.html?org/apache/pulsar/client/api/SubscriptionType.html">Subscription type</a>: <code>Exclusive</code>, <code>Failover</code>, <code>Shared</code>, <code>Key_Shared</code></td></tr>
<tr><td style="text-align:left"><code>receiverQueueSize</code></td><td style="text-align:left">int</td><td style="text-align:left">no</td><td style="text-align:left">Size of the consumer receive queue (default: 1000)</td></tr>
<tr><td style="text-align:left"><code>consumerName</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Consumer name</td></tr>
<tr><td style="text-align:left"><code>priorityLevel</code></td><td style="text-align:left">int</td><td style="text-align:left">no</td><td style="text-align:left">Define a <a href="http://pulsar.apache.org/api/client/2.6.0-SNAPSHOT/org/apache/pulsar/client/api/ConsumerConfiguration.html#setPriorityLevel-int-">priority</a> for the consumer</td></tr>
<tr><td style="text-align:left"><code>maxRedeliverCount</code></td><td style="text-align:left">int</td><td style="text-align:left">no</td><td style="text-align:left">Define a <a href="http://pulsar.apache.org/api/client/2.6.0-SNAPSHOT/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-">maxRedeliverCount</a> for the consumer (default: 0). Activates <a href="https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic">Dead Letter Topic</a> feature.</td></tr>
<tr><td style="text-align:left"><code>deadLetterTopic</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Define a <a href="http://pulsar.apache.org/api/client/2.6.0-SNAPSHOT/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-">deadLetterTopic</a> for the consumer (default: {topic}-{subscription}-DLQ). Activates <a href="https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic">Dead Letter Topic</a> feature.</td></tr>
<tr><td style="text-align:left"><code>pullMode</code></td><td style="text-align:left">boolean</td><td style="text-align:left">no</td><td style="text-align:left">Enable pull mode (default: false). See &quot;Flow Control&quot; below.</td></tr>
</tbody>
</table>
<p>NB: these parameter (except <code>pullMode</code>) apply to the internal consumer of the WebSocket service.
So messages will be subject to the redelivery settings as soon as the get into the receive queue,
even if the client doesn't consume on the WebSocket.</p>
<h5><a class="anchor" aria-hidden="true" id="receiving-messages"></a><a href="#receiving-messages" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Receiving messages</h5>
<p>Server will push messages on the WebSocket session:</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"messageId"</span>: <span class="hljs-string">"CAAQAw=="</span>,
<span class="hljs-attr">"payload"</span>: <span class="hljs-string">"SGVsbG8gV29ybGQ="</span>,
<span class="hljs-attr">"properties"</span>: {<span class="hljs-attr">"key1"</span>: <span class="hljs-string">"value1"</span>, <span class="hljs-attr">"key2"</span>: <span class="hljs-string">"value2"</span>},
<span class="hljs-attr">"publishTime"</span>: <span class="hljs-string">"2016-08-30 16:45:57.785"</span>
}
</code></pre>
<table>
<thead>
<tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>messageId</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Message ID</td></tr>
<tr><td style="text-align:left"><code>payload</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Base-64 encoded payload</td></tr>
<tr><td style="text-align:left"><code>publishTime</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Publish timestamp</td></tr>
<tr><td style="text-align:left"><code>properties</code></td><td style="text-align:left">key-value pairs</td><td style="text-align:left">no</td><td style="text-align:left">Application-defined properties</td></tr>
<tr><td style="text-align:left"><code>key</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Original routing key set by producer</td></tr>
</tbody>
</table>
<h4><a class="anchor" aria-hidden="true" id="acknowledging-the-message"></a><a href="#acknowledging-the-message" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Acknowledging the message</h4>
<p>Consumer needs to acknowledge the successful processing of the message to
have the Pulsar broker delete it.</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"messageId"</span>: <span class="hljs-string">"CAAQAw=="</span>
}
</code></pre>
<table>
<thead>
<tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>messageId</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Message ID of the processed message</td></tr>
</tbody>
</table>
<h4><a class="anchor" aria-hidden="true" id="flow-control"></a><a href="#flow-control" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Flow control</h4>
<h5><a class="anchor" aria-hidden="true" id="push-mode"></a><a href="#push-mode" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Push Mode</h5>
<p>By default (<code>pullMode=false</code>), the consumer endpoint will use the <code>receiverQueueSize</code> parameter both to size its
internal receive queue and to limit the number of unacknowledged messages that are passed to the WebSocket client.
In this mode, if you don't send acknowledgements, the Pulsar WebSocket service will stop sending messages after reaching
<code>receiverQueueSize</code> unacked messages sent to the WebSocket client.</p>
<h5><a class="anchor" aria-hidden="true" id="pull-mode"></a><a href="#pull-mode" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Pull Mode</h5>
<p>If you set <code>pullMode</code> to <code>true</code>, the WebSocket client will need to send <code>permit</code> commands to permit the
Pulsar WebSocket service to send more messages.</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"type"</span>: <span class="hljs-string">"permit"</span>,
<span class="hljs-attr">"permitMessages"</span>: <span class="hljs-number">100</span>
}
</code></pre>
<table>
<thead>
<tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>type</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Type of command. Must be <code>permit</code></td></tr>
<tr><td style="text-align:left"><code>permitMessages</code></td><td style="text-align:left">int</td><td style="text-align:left">yes</td><td style="text-align:left">Number of messages to permit</td></tr>
</tbody>
</table>
<p>NB: in this mode it's possible to acknowledge messages in a different connection.</p>
<h3><a class="anchor" aria-hidden="true" id="reader-endpoint"></a><a href="#reader-endpoint" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reader endpoint</h3>
<p>The reader endpoint requires you to specify a tenant, namespace, and topic in the URL:</p>
<pre><code class="hljs css language-http">ws://broker-service-url:8080/ws/v2/reader/persistent/:tenant/:namespace/:topic
</code></pre>
<h5><a class="anchor" aria-hidden="true" id="query-param-2"></a><a href="#query-param-2" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Query param</h5>
<table>
<thead>
<tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>readerName</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Reader name</td></tr>
<tr><td style="text-align:left"><code>receiverQueueSize</code></td><td style="text-align:left">int</td><td style="text-align:left">no</td><td style="text-align:left">Size of the consumer receive queue (default: 1000)</td></tr>
<tr><td style="text-align:left"><code>messageId</code></td><td style="text-align:left">int or enum</td><td style="text-align:left">no</td><td style="text-align:left">Message ID to start from, <code>earliest</code> or <code>latest</code> (default: <code>latest</code>)</td></tr>
</tbody>
</table>
<h5><a class="anchor" aria-hidden="true" id="receiving-messages-1"></a><a href="#receiving-messages-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Receiving messages</h5>
<p>Server will push messages on the WebSocket session:</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"messageId"</span>: <span class="hljs-string">"CAAQAw=="</span>,
<span class="hljs-attr">"payload"</span>: <span class="hljs-string">"SGVsbG8gV29ybGQ="</span>,
<span class="hljs-attr">"properties"</span>: {<span class="hljs-attr">"key1"</span>: <span class="hljs-string">"value1"</span>, <span class="hljs-attr">"key2"</span>: <span class="hljs-string">"value2"</span>},
<span class="hljs-attr">"publishTime"</span>: <span class="hljs-string">"2016-08-30 16:45:57.785"</span>
}
</code></pre>
<table>
<thead>
<tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>messageId</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Message ID</td></tr>
<tr><td style="text-align:left"><code>payload</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Base-64 encoded payload</td></tr>
<tr><td style="text-align:left"><code>publishTime</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Publish timestamp</td></tr>
<tr><td style="text-align:left"><code>properties</code></td><td style="text-align:left">key-value pairs</td><td style="text-align:left">no</td><td style="text-align:left">Application-defined properties</td></tr>
<tr><td style="text-align:left"><code>key</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Original routing key set by producer</td></tr>
</tbody>
</table>
<h4><a class="anchor" aria-hidden="true" id="acknowledging-the-message-1"></a><a href="#acknowledging-the-message-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Acknowledging the message</h4>
<p><strong>In WebSocket</strong>, Reader needs to acknowledge the successful processing of the message to
have the Pulsar WebSocket service update the number of pending messages.
If you don't send acknowledgements, Pulsar WebSocket service will stop sending messages after reaching the pendingMessages limit.</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"messageId"</span>: <span class="hljs-string">"CAAQAw=="</span>
}
</code></pre>
<table>
<thead>
<tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>messageId</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Message ID of the processed message</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="error-codes"></a><a href="#error-codes" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Error codes</h3>
<p>In case of error the server will close the WebSocket session using the
following error codes:</p>
<table>
<thead>
<tr><th style="text-align:left">Error Code</th><th style="text-align:left">Error Message</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">1</td><td style="text-align:left">Failed to create producer</td></tr>
<tr><td style="text-align:left">2</td><td style="text-align:left">Failed to subscribe</td></tr>
<tr><td style="text-align:left">3</td><td style="text-align:left">Failed to deserialize from JSON</td></tr>
<tr><td style="text-align:left">4</td><td style="text-align:left">Failed to serialize to JSON</td></tr>
<tr><td style="text-align:left">5</td><td style="text-align:left">Failed to authenticate client</td></tr>
<tr><td style="text-align:left">6</td><td style="text-align:left">Client is not authorized</td></tr>
<tr><td style="text-align:left">7</td><td style="text-align:left">Invalid payload encoding</td></tr>
<tr><td style="text-align:left">8</td><td style="text-align:left">Unknown error</td></tr>
</tbody>
</table>
<blockquote>
<p>The application is responsible for re-establishing a new WebSocket session after a backoff period.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="client-examples"></a><a href="#client-examples" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Client examples</h2>
<p>Below you'll find code examples for the Pulsar WebSocket API in <a href="#python">Python</a> and <a href="#nodejs">Node.js</a>.</p>
<h3><a class="anchor" aria-hidden="true" id="python"></a><a href="#python" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python</h3>
<p>This example uses the <a href="https://pypi.python.org/pypi/websocket-client"><code>websocket-client</code></a> package. You can install it using <a href="https://pypi.python.org/pypi/pip">pip</a>:</p>
<pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> pip install websocket-client</span>
</code></pre>
<p>You can also download it from <a href="https://pypi.python.org/pypi/websocket-client">PyPI</a>.</p>
<h4><a class="anchor" aria-hidden="true" id="python-producer"></a><a href="#python-producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python producer</h4>
<p>Here's an example Python producer that sends a simple message to a Pulsar <a href="/docs/en/2.6.0/reference-terminology#topic">topic</a>:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">import</span> websocket, base64, json
TOPIC = <span class="hljs-string">'ws://localhost:8080/ws/v2/producer/persistent/public/default/my-topic'</span>
ws = websocket.create_connection(TOPIC)
<span class="hljs-comment"># Send one message as JSON</span>
ws.send(json.dumps({
<span class="hljs-string">'payload'</span> : base64.b64encode(<span class="hljs-string">'Hello World'</span>),
<span class="hljs-string">'properties'</span>: {
<span class="hljs-string">'key1'</span> : <span class="hljs-string">'value1'</span>,
<span class="hljs-string">'key2'</span> : <span class="hljs-string">'value2'</span>
},
<span class="hljs-string">'context'</span> : <span class="hljs-number">5</span>
}))
response = json.loads(ws.recv())
<span class="hljs-keyword">if</span> response[<span class="hljs-string">'result'</span>] == <span class="hljs-string">'ok'</span>:
<span class="hljs-keyword">print</span> <span class="hljs-string">'Message published successfully'</span>
<span class="hljs-keyword">else</span>:
<span class="hljs-keyword">print</span> <span class="hljs-string">'Failed to publish message:'</span>, response
ws.close()
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="python-consumer"></a><a href="#python-consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python consumer</h4>
<p>Here's an example Python consumer that listens on a Pulsar topic and prints the message ID whenever a message arrives:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">import</span> websocket, base64, json
TOPIC = <span class="hljs-string">'ws://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub'</span>
ws = websocket.create_connection(TOPIC)
<span class="hljs-keyword">while</span> <span class="hljs-literal">True</span>:
msg = json.loads(ws.recv())
<span class="hljs-keyword">if</span> <span class="hljs-keyword">not</span> msg: <span class="hljs-keyword">break</span>
<span class="hljs-keyword">print</span> <span class="hljs-string">"Received: {} - payload: {}"</span>.format(msg, base64.b64decode(msg[<span class="hljs-string">'payload'</span>]))
<span class="hljs-comment"># Acknowledge successful processing</span>
ws.send(json.dumps({<span class="hljs-string">'messageId'</span> : msg[<span class="hljs-string">'messageId'</span>]}))
ws.close()
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="python-reader"></a><a href="#python-reader" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python reader</h4>
<p>Here's an example Python reader that listens on a Pulsar topic and prints the message ID whenever a message arrives:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">import</span> websocket, base64, json
TOPIC = <span class="hljs-string">'ws://localhost:8080/ws/v2/reader/persistent/public/default/my-topic'</span>
ws = websocket.create_connection(TOPIC)
<span class="hljs-keyword">while</span> <span class="hljs-literal">True</span>:
msg = json.loads(ws.recv())
<span class="hljs-keyword">if</span> <span class="hljs-keyword">not</span> msg: <span class="hljs-keyword">break</span>
<span class="hljs-keyword">print</span> <span class="hljs-string">"Received: {} - payload: {}"</span>.format(msg, base64.b64decode(msg[<span class="hljs-string">'payload'</span>]))
<span class="hljs-comment"># Acknowledge successful processing</span>
ws.send(json.dumps({<span class="hljs-string">'messageId'</span> : msg[<span class="hljs-string">'messageId'</span>]}))
ws.close()
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="nodejs"></a><a href="#nodejs" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Node.js</h3>
<p>This example uses the <a href="https://websockets.github.io/ws/"><code>ws</code></a> package. You can install it using <a href="https://www.npmjs.com/">npm</a>:</p>
<pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> npm install ws</span>
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="nodejs-producer"></a><a href="#nodejs-producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Node.js producer</h4>
<p>Here's an example Node.js producer that sends a simple message to a Pulsar topic:</p>
<pre><code class="hljs css language-javascript"><span class="hljs-keyword">var</span> WebSocket = <span class="hljs-built_in">require</span>(<span class="hljs-string">'ws'</span>),
topic = <span class="hljs-string">"ws://localhost:8080/ws/v2/producer/persistent/public/default/my-topic"</span>,
ws = <span class="hljs-keyword">new</span> WebSocket(topic);
<span class="hljs-keyword">var</span> message = {
<span class="hljs-string">"payload"</span> : <span class="hljs-keyword">new</span> Buffer(<span class="hljs-string">"Hello World"</span>).toString(<span class="hljs-string">'base64'</span>),
<span class="hljs-string">"properties"</span>: {
<span class="hljs-string">"key1"</span> : <span class="hljs-string">"value1"</span>,
<span class="hljs-string">"key2"</span> : <span class="hljs-string">"value2"</span>
},
<span class="hljs-string">"context"</span> : <span class="hljs-string">"1"</span>
};
ws.on(<span class="hljs-string">'open'</span>, <span class="hljs-function"><span class="hljs-keyword">function</span>(<span class="hljs-params"></span>) </span>{
<span class="hljs-comment">// Send one message</span>
ws.send(<span class="hljs-built_in">JSON</span>.stringify(message));
});
ws.on(<span class="hljs-string">'message'</span>, <span class="hljs-function"><span class="hljs-keyword">function</span>(<span class="hljs-params">message</span>) </span>{
<span class="hljs-built_in">console</span>.log(<span class="hljs-string">'received ack: %s'</span>, message);
});
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="nodejs-consumer"></a><a href="#nodejs-consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Node.js consumer</h4>
<p>Here's an example Node.js consumer that listens on the same topic used by the producer above:</p>
<pre><code class="hljs css language-javascript"><span class="hljs-keyword">var</span> WebSocket = <span class="hljs-built_in">require</span>(<span class="hljs-string">'ws'</span>),
topic = <span class="hljs-string">"ws://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub"</span>,
ws = <span class="hljs-keyword">new</span> WebSocket(topic);
ws.on(<span class="hljs-string">'message'</span>, <span class="hljs-function"><span class="hljs-keyword">function</span>(<span class="hljs-params">message</span>) </span>{
<span class="hljs-keyword">var</span> receiveMsg = <span class="hljs-built_in">JSON</span>.parse(message);
<span class="hljs-built_in">console</span>.log(<span class="hljs-string">'Received: %s - payload: %s'</span>, message, <span class="hljs-keyword">new</span> Buffer(receiveMsg.payload, <span class="hljs-string">'base64'</span>).toString());
<span class="hljs-keyword">var</span> ackMsg = {<span class="hljs-string">"messageId"</span> : receiveMsg.messageId};
ws.send(<span class="hljs-built_in">JSON</span>.stringify(ackMsg));
});
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="nodejs-reader"></a><a href="#nodejs-reader" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>NodeJS reader</h4>
<pre><code class="hljs css language-javascript"><span class="hljs-keyword">var</span> WebSocket = <span class="hljs-built_in">require</span>(<span class="hljs-string">'ws'</span>),
topic = <span class="hljs-string">"ws://localhost:8080/ws/v2/reader/persistent/public/default/my-topic"</span>,
ws = <span class="hljs-keyword">new</span> WebSocket(topic);
ws.on(<span class="hljs-string">'message'</span>, <span class="hljs-function"><span class="hljs-keyword">function</span>(<span class="hljs-params">message</span>) </span>{
<span class="hljs-keyword">var</span> receiveMsg = <span class="hljs-built_in">JSON</span>.parse(message);
<span class="hljs-built_in">console</span>.log(<span class="hljs-string">'Received: %s - payload: %s'</span>, message, <span class="hljs-keyword">new</span> Buffer(receiveMsg.payload, <span class="hljs-string">'base64'</span>).toString());
<span class="hljs-keyword">var</span> ackMsg = {<span class="hljs-string">"messageId"</span> : receiveMsg.messageId};
ws.send(<span class="hljs-built_in">JSON</span>.stringify(ackMsg));
});
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.6.0/client-libraries-node"><span class="arrow-prev"></span><span>Node.js</span></a><a class="docs-next button" href="/docs/en/2.6.0/client-libraries-dotnet"><span>C#</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#running-the-websocket-service">Running the WebSocket service</a><ul class="toc-headings"><li><a href="#embedded-with-a-pulsar-broker">Embedded with a Pulsar broker</a></li><li><a href="#as-a-separate-component">As a separate component</a></li><li><a href="#starting-the-broker">Starting the broker</a></li></ul></li><li><a href="#api-reference">API Reference</a><ul class="toc-headings"><li><a href="#producer-endpoint">Producer endpoint</a></li><li><a href="#consumer-endpoint">Consumer endpoint</a></li><li><a href="#reader-endpoint">Reader endpoint</a></li><li><a href="#error-codes">Error codes</a></li></ul></li><li><a href="#client-examples">Client examples</a><ul class="toc-headings"><li><a href="#python">Python</a></li><li><a href="#nodejs">Node.js</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>