| <!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Pulsar's WebSocket API · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="Pulsar's [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) API is meant to provide a simple way to interact with Pulsar using languages that do not have an official [client library](/docs/en/2.5.0/client-libraries). Through WebSockets you can publish and consume messages and use all the features available in the [Java](/docs/en/2.5.0/client-libraries-java), [Go](/docs/en/2.5.0/client-libraries-go), [Python](/docs/en/2.5.0/client-libraries-python) and [C++](/docs/en/2.5.0/client-libraries-cpp) client libraries."/><meta name="docsearch:version" content="2.5.0"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Pulsar's WebSocket API · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="Pulsar's [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API) API is meant to provide a simple way to interact with Pulsar using languages that do not have an official [client library](/docs/en/2.5.0/client-libraries). Through WebSockets you can publish and consume messages and use all the features available in the [Java](/docs/en/2.5.0/client-libraries-java), [Go](/docs/en/2.5.0/client-libraries-go), [Python](/docs/en/2.5.0/client-libraries-python) and [C++](/docs/en/2.5.0/client-libraries-cpp) client libraries."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.5.0</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.5.0/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.5.0/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.5.0/client-libraries-websocket">日本語</a></li><li><a href="/docs/fr/2.5.0/client-libraries-websocket">Français</a></li><li><a href="/docs/ko/2.5.0/client-libraries-websocket">한국어</a></li><li><a href="/docs/zh-CN/2.5.0/client-libraries-websocket">中文</a></li><li><a href="/docs/zh-TW/2.5.0/client-libraries-websocket">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script> |
| const languagesMenuItem = document.getElementById("languages-menu"); |
| const languagesDropDown = document.getElementById("languages-dropdown"); |
| languagesMenuItem.addEventListener("click", function(event) { |
| event.preventDefault(); |
| |
| if (languagesDropDown.className == "hide") { |
| languagesDropDown.className = "visible"; |
| } else { |
| languagesDropDown.className = "hide"; |
| } |
| }); |
| </script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i>›</i><span>Client Libraries</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/getting-started-helm">Run Pulsar in Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/client-libraries">Use Pulsar with client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/concepts-tiered-storage">Tiered Storage</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/io-connectors">Built-in connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/io-cdc">CDC connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/io-develop">Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/io-cli">CLI</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/sql-getting-started">Query data</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/sql-deployment-configurations">Configuration and deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/sql-rest-api">REST APIs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Kubernetes (Helm)</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/helm-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/helm-prepare">Prepare</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/helm-install">Install</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/helm-deploy">Deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/helm-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/helm-tools">Required Tools</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/administration-pulsar-manager">Pulsar Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/administration-upgrade">Upgrade</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/security-extending">Extending</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/client-libraries-node">Node.js</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.5.0/client-libraries-websocket">WebSocket</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/admin-api-non-partitioned-topics">Non-Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/admin-api-functions">Functions</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.5.0/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script> |
| var coll = document.getElementsByClassName('collapsible'); |
| var checkActiveCategory = true; |
| for (var i = 0; i < coll.length; i++) { |
| var links = coll[i].nextElementSibling.getElementsByTagName('*'); |
| if (checkActiveCategory){ |
| for (var j = 0; j < links.length; j++) { |
| if (links[j].classList.contains('navListItemActive')){ |
| coll[i].nextElementSibling.classList.toggle('hide'); |
| coll[i].childNodes[1].classList.toggle('rotate'); |
| checkActiveCategory = false; |
| break; |
| } |
| } |
| } |
| |
| coll[i].addEventListener('click', function() { |
| var arrow = this.childNodes[1]; |
| arrow.classList.toggle('rotate'); |
| var content = this.nextElementSibling; |
| content.classList.toggle('hide'); |
| }); |
| } |
| |
| document.addEventListener('DOMContentLoaded', function() { |
| createToggler('#navToggler', '#docsNav', 'docsSliderActive'); |
| createToggler('#tocToggler', 'body', 'tocActive'); |
| |
| var headings = document.querySelector('.toc-headings'); |
| headings && headings.addEventListener('click', function(event) { |
| var el = event.target; |
| while(el !== headings){ |
| if (el.tagName === 'A') { |
| document.body.classList.remove('tocActive'); |
| break; |
| } else{ |
| el = el.parentNode; |
| } |
| } |
| }, false); |
| |
| function createToggler(togglerSelector, targetSelector, className) { |
| var toggler = document.querySelector(togglerSelector); |
| var target = document.querySelector(targetSelector); |
| |
| if (!toggler) { |
| return; |
| } |
| |
| toggler.onclick = function(event) { |
| event.preventDefault(); |
| |
| target.classList.toggle(className); |
| }; |
| } |
| }); |
| </script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/client-libraries-websocket.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Pulsar's WebSocket API</h1></header><article><div><span><p>Pulsar's <a href="https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API">WebSocket</a> API is meant to provide a simple way to interact with Pulsar using languages that do not have an official <a href="/docs/en/2.5.0/client-libraries">client library</a>. Through WebSockets you can publish and consume messages and use all the features available in the <a href="/docs/en/2.5.0/client-libraries-java">Java</a>, <a href="/docs/en/2.5.0/client-libraries-go">Go</a>, <a href="/docs/en/2.5.0/client-libraries-python">Python</a> and <a href="/docs/en/2.5.0/client-libraries-cpp">C++</a> client libraries.</p> |
| <blockquote> |
| <p>You can use Pulsar's WebSocket API with any WebSocket client library. See examples for Python and Node.js <a href="#client-examples">below</a>.</p> |
| </blockquote> |
| <h2><a class="anchor" aria-hidden="true" id="running-the-websocket-service"></a><a href="#running-the-websocket-service" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Running the WebSocket service</h2> |
| <p>The standalone variant of Pulsar that we recommend using for <a href="/docs/en/2.5.0/getting-started-standalone">local development</a> already has the WebSocket service enabled.</p> |
| <p>In non-standalone mode, there are two ways to deploy the WebSocket service:</p> |
| <ul> |
| <li><a href="#embedded-with-a-pulsar-broker">embedded</a> with a Pulsar broker</li> |
| <li>as a <a href="#as-a-separate-component">separate component</a></li> |
| </ul> |
| <h3><a class="anchor" aria-hidden="true" id="embedded-with-a-pulsar-broker"></a><a href="#embedded-with-a-pulsar-broker" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Embedded with a Pulsar broker</h3> |
| <p>In this mode, the WebSocket service will run within the same HTTP service that's already running in the broker. To enable this mode, set the <a href="/docs/en/2.5.0/reference-configuration#broker-webSocketServiceEnabled"><code>webSocketServiceEnabled</code></a> parameter in the <a href="/docs/en/2.5.0/reference-configuration#broker"><code>conf/broker.conf</code></a> configuration file in your installation.</p> |
| <pre><code class="hljs css language-properties"><span class="hljs-attr">webSocketServiceEnabled</span>=<span class="hljs-string">true</span> |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="as-a-separate-component"></a><a href="#as-a-separate-component" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>As a separate component</h3> |
| <p>In this mode, the WebSocket service will be run from a Pulsar <a href="/docs/en/2.5.0/reference-terminology#broker">broker</a> as a separate service. Configuration for this mode is handled in the <a href="/docs/en/2.5.0/reference-configuration#websocket"><code>conf/websocket.conf</code></a> configuration file. You'll need to set <em>at least</em> the following parameters:</p> |
| <ul> |
| <li><a href="/docs/en/2.5.0/reference-configuration#websocket-configurationStoreServers"><code>configurationStoreServers</code></a></li> |
| <li><a href="/docs/en/2.5.0/reference-configuration#websocket-webServicePort"><code>webServicePort</code></a></li> |
| <li><a href="/docs/en/2.5.0/reference-configuration#websocket-clusterName"><code>clusterName</code></a></li> |
| </ul> |
| <p>Here's an example:</p> |
| <pre><code class="hljs css language-properties"><span class="hljs-attr">configurationStoreServers</span>=<span class="hljs-string">zk1:2181,zk2:2181,zk3:2181</span> |
| <span class="hljs-attr">webServicePort</span>=<span class="hljs-string">8080</span> |
| <span class="hljs-attr">clusterName</span>=<span class="hljs-string">my-cluster</span> |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="starting-the-broker"></a><a href="#starting-the-broker" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Starting the broker</h3> |
| <p>When the configuration is set, you can start the service using the <a href="/docs/en/2.5.0/reference-cli-tools#pulsar-daemon"><code>pulsar-daemon</code></a> tool:</p> |
| <pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> bin/pulsar-daemon start websocket</span> |
| </code></pre> |
| <h2><a class="anchor" aria-hidden="true" id="api-reference"></a><a href="#api-reference" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>API Reference</h2> |
| <p>Pulsar's WebSocket API offers three endpoints for <a href="#producer-endpoint">producing</a> messages, <a href="#consumer-endpoint">consuming</a> messages and <a href="#reader-endpoint">reading</a> messages.</p> |
| <p>All exchanges via the WebSocket API use JSON.</p> |
| <h3><a class="anchor" aria-hidden="true" id="producer-endpoint"></a><a href="#producer-endpoint" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producer endpoint</h3> |
| <p>The producer endpoint requires you to specify a tenant, namespace, and topic in the URL:</p> |
| <pre><code class="hljs css language-http">ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic |
| </code></pre> |
| <h5><a class="anchor" aria-hidden="true" id="query-param"></a><a href="#query-param" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Query param</h5> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>sendTimeoutMillis</code></td><td style="text-align:left">long</td><td style="text-align:left">no</td><td style="text-align:left">Send timeout (default: 30 secs)</td></tr> |
| <tr><td style="text-align:left"><code>batchingEnabled</code></td><td style="text-align:left">boolean</td><td style="text-align:left">no</td><td style="text-align:left">Enable batching of messages (default: false)</td></tr> |
| <tr><td style="text-align:left"><code>batchingMaxMessages</code></td><td style="text-align:left">int</td><td style="text-align:left">no</td><td style="text-align:left">Maximum number of messages permitted in a batch (default: 1000)</td></tr> |
| <tr><td style="text-align:left"><code>maxPendingMessages</code></td><td style="text-align:left">int</td><td style="text-align:left">no</td><td style="text-align:left">Set the max size of the internal-queue holding the messages (default: 1000)</td></tr> |
| <tr><td style="text-align:left"><code>batchingMaxPublishDelay</code></td><td style="text-align:left">long</td><td style="text-align:left">no</td><td style="text-align:left">Time period within which the messages will be batched (default: 10ms)</td></tr> |
| <tr><td style="text-align:left"><code>messageRoutingMode</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Message <a href="https://pulsar.apache.org/api/client/2.5.0-SNAPSHOT/index.html?org/apache/pulsar/client/api/ProducerConfiguration.MessageRoutingMode.html">routing mode</a> for the partitioned producer: <code>SinglePartition</code>, <code>RoundRobinPartition</code></td></tr> |
| <tr><td style="text-align:left"><code>compressionType</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Compression <a href="https://pulsar.apache.org/api/client/2.5.0-SNAPSHOT/index.html?org/apache/pulsar/client/api/CompressionType.html">type</a>: <code>LZ4</code>, <code>ZLIB</code></td></tr> |
| <tr><td style="text-align:left"><code>producerName</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Specify the name for the producer. Pulsar will enforce only one producer with same name can be publishing on a topic</td></tr> |
| <tr><td style="text-align:left"><code>initialSequenceId</code></td><td style="text-align:left">long</td><td style="text-align:left">no</td><td style="text-align:left">Set the baseline for the sequence ids for messages published by the producer.</td></tr> |
| <tr><td style="text-align:left"><code>hashingScheme</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left"><a href="http://pulsar.apache.org/api/client/2.5.0-SNAPSHOT/org/apache/pulsar/client/api/ProducerConfiguration.HashingScheme.html">Hashing function</a> to use when publishing on a partitioned topic: <code>JavaStringHash</code>, <code>Murmur3_32Hash</code></td></tr> |
| </tbody> |
| </table> |
| <h4><a class="anchor" aria-hidden="true" id="publishing-a-message"></a><a href="#publishing-a-message" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Publishing a message</h4> |
| <pre><code class="hljs css language-json">{ |
| <span class="hljs-attr">"payload"</span>: <span class="hljs-string">"SGVsbG8gV29ybGQ="</span>, |
| <span class="hljs-attr">"properties"</span>: {<span class="hljs-attr">"key1"</span>: <span class="hljs-string">"value1"</span>, <span class="hljs-attr">"key2"</span>: <span class="hljs-string">"value2"</span>}, |
| <span class="hljs-attr">"context"</span>: <span class="hljs-string">"1"</span> |
| } |
| </code></pre> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>payload</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Base-64 encoded payload</td></tr> |
| <tr><td style="text-align:left"><code>properties</code></td><td style="text-align:left">key-value pairs</td><td style="text-align:left">no</td><td style="text-align:left">Application-defined properties</td></tr> |
| <tr><td style="text-align:left"><code>context</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Application-defined request identifier</td></tr> |
| <tr><td style="text-align:left"><code>key</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">For partitioned topics, decides which partition to use</td></tr> |
| <tr><td style="text-align:left"><code>replicationClusters</code></td><td style="text-align:left">array</td><td style="text-align:left">no</td><td style="text-align:left">Restrict replication to this list of <a href="/docs/en/2.5.0/reference-terminology#cluster">clusters</a>, specified by name</td></tr> |
| </tbody> |
| </table> |
| <h5><a class="anchor" aria-hidden="true" id="example-success-response"></a><a href="#example-success-response" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example success response</h5> |
| <pre><code class="hljs css language-json">{ |
| <span class="hljs-attr">"result"</span>: <span class="hljs-string">"ok"</span>, |
| <span class="hljs-attr">"messageId"</span>: <span class="hljs-string">"CAAQAw=="</span>, |
| <span class="hljs-attr">"context"</span>: <span class="hljs-string">"1"</span> |
| } |
| </code></pre> |
| <h5><a class="anchor" aria-hidden="true" id="example-failure-response"></a><a href="#example-failure-response" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example failure response</h5> |
| <pre><code class="hljs css language-json"> { |
| <span class="hljs-attr">"result"</span>: <span class="hljs-string">"send-error:3"</span>, |
| <span class="hljs-attr">"errorMsg"</span>: <span class="hljs-string">"Failed to de-serialize from JSON"</span>, |
| <span class="hljs-attr">"context"</span>: <span class="hljs-string">"1"</span> |
| } |
| </code></pre> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>result</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left"><code>ok</code> if successful or an error message if unsuccessful</td></tr> |
| <tr><td style="text-align:left"><code>messageId</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Message ID assigned to the published message</td></tr> |
| <tr><td style="text-align:left"><code>context</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Application-defined request identifier</td></tr> |
| </tbody> |
| </table> |
| <h3><a class="anchor" aria-hidden="true" id="consumer-endpoint"></a><a href="#consumer-endpoint" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumer endpoint</h3> |
| <p>The consumer endpoint requires you to specify a tenant, namespace, and topic, as well as a subscription, in the URL:</p> |
| <pre><code class="hljs css language-http">ws://broker-service-url:8080/ws/v2/consumer/persistent/:tenant/:namespace/:topic/:subscription |
| </code></pre> |
| <h5><a class="anchor" aria-hidden="true" id="query-param-1"></a><a href="#query-param-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Query param</h5> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>ackTimeoutMillis</code></td><td style="text-align:left">long</td><td style="text-align:left">no</td><td style="text-align:left">Set the timeout for unacked messages (default: 0)</td></tr> |
| <tr><td style="text-align:left"><code>subscriptionType</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left"><a href="https://pulsar.apache.org/api/client/2.5.0-SNAPSHOT/index.html?org/apache/pulsar/client/api/SubscriptionType.html">Subscription type</a>: <code>Exclusive</code>, <code>Failover</code>, <code>Shared</code></td></tr> |
| <tr><td style="text-align:left"><code>receiverQueueSize</code></td><td style="text-align:left">int</td><td style="text-align:left">no</td><td style="text-align:left">Size of the consumer receive queue (default: 1000)</td></tr> |
| <tr><td style="text-align:left"><code>consumerName</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Consumer name</td></tr> |
| <tr><td style="text-align:left"><code>priorityLevel</code></td><td style="text-align:left">int</td><td style="text-align:left">no</td><td style="text-align:left">Define a <a href="http://pulsar.apache.org/api/client/2.5.0-SNAPSHOT/org/apache/pulsar/client/api/ConsumerConfiguration.html#setPriorityLevel-int-">priority</a> for the consumer</td></tr> |
| <tr><td style="text-align:left"><code>maxRedeliverCount</code></td><td style="text-align:left">int</td><td style="text-align:left">no</td><td style="text-align:left">Define a <a href="http://pulsar.apache.org/api/client/2.5.0-SNAPSHOT/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-">maxRedeliverCount</a> for the consumer (default: 0). Activates <a href="https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic">Dead Letter Topic</a> feature.</td></tr> |
| <tr><td style="text-align:left"><code>deadLetterTopic</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Define a <a href="http://pulsar.apache.org/api/client/2.5.0-SNAPSHOT/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-">deadLetterTopic</a> for the consumer (default: {topic}-{subscription}-DLQ). Activates <a href="https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic">Dead Letter Topic</a> feature.</td></tr> |
| <tr><td style="text-align:left"><code>pullMode</code></td><td style="text-align:left">boolean</td><td style="text-align:left">no</td><td style="text-align:left">Enable pull mode (default: false). See "Flow Control" below.</td></tr> |
| </tbody> |
| </table> |
| <p>NB: these parameter (except <code>pullMode</code>) apply to the internal consumer of the WebSocket service. |
| So messages will be subject to the redelivery settings as soon as the get into the receive queue, |
| even if the client doesn't consume on the WebSocket.</p> |
| <h5><a class="anchor" aria-hidden="true" id="receiving-messages"></a><a href="#receiving-messages" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Receiving messages</h5> |
| <p>Server will push messages on the WebSocket session:</p> |
| <pre><code class="hljs css language-json">{ |
| <span class="hljs-attr">"messageId"</span>: <span class="hljs-string">"CAAQAw=="</span>, |
| <span class="hljs-attr">"payload"</span>: <span class="hljs-string">"SGVsbG8gV29ybGQ="</span>, |
| <span class="hljs-attr">"properties"</span>: {<span class="hljs-attr">"key1"</span>: <span class="hljs-string">"value1"</span>, <span class="hljs-attr">"key2"</span>: <span class="hljs-string">"value2"</span>}, |
| <span class="hljs-attr">"publishTime"</span>: <span class="hljs-string">"2016-08-30 16:45:57.785"</span> |
| } |
| </code></pre> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>messageId</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Message ID</td></tr> |
| <tr><td style="text-align:left"><code>payload</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Base-64 encoded payload</td></tr> |
| <tr><td style="text-align:left"><code>publishTime</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Publish timestamp</td></tr> |
| <tr><td style="text-align:left"><code>properties</code></td><td style="text-align:left">key-value pairs</td><td style="text-align:left">no</td><td style="text-align:left">Application-defined properties</td></tr> |
| <tr><td style="text-align:left"><code>key</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Original routing key set by producer</td></tr> |
| </tbody> |
| </table> |
| <h4><a class="anchor" aria-hidden="true" id="acknowledging-the-message"></a><a href="#acknowledging-the-message" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Acknowledging the message</h4> |
| <p>Consumer needs to acknowledge the successful processing of the message to |
| have the Pulsar broker delete it.</p> |
| <pre><code class="hljs css language-json">{ |
| <span class="hljs-attr">"messageId"</span>: <span class="hljs-string">"CAAQAw=="</span> |
| } |
| </code></pre> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>messageId</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Message ID of the processed message</td></tr> |
| </tbody> |
| </table> |
| <h4><a class="anchor" aria-hidden="true" id="flow-control"></a><a href="#flow-control" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Flow control</h4> |
| <h5><a class="anchor" aria-hidden="true" id="push-mode"></a><a href="#push-mode" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Push Mode</h5> |
| <p>By default (<code>pullMode=false</code>), the consumer endpoint will use the <code>receiverQueueSize</code> parameter both to size its |
| internal receive queue and to limit the number of unacknowledged messages that are passed to the WebSocket client. |
| In this mode, if you don't send acknowledgements, the Pulsar WebSocket service will stop sending messages after reaching |
| <code>receiverQueueSize</code> unacked messages sent to the WebSocket client.</p> |
| <h5><a class="anchor" aria-hidden="true" id="pull-mode"></a><a href="#pull-mode" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Pull Mode</h5> |
| <p>If you set <code>pullMode</code> to <code>true</code>, the WebSocket client will need to send <code>permit</code> commands to permit the |
| Pulsar WebSocket service to send more messages.</p> |
| <pre><code class="hljs css language-json">{ |
| <span class="hljs-attr">"type"</span>: <span class="hljs-string">"permit"</span>, |
| <span class="hljs-attr">"permitMessages"</span>: <span class="hljs-number">100</span> |
| } |
| </code></pre> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>type</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Type of command. Must be <code>permit</code></td></tr> |
| <tr><td style="text-align:left"><code>permitMessages</code></td><td style="text-align:left">int</td><td style="text-align:left">yes</td><td style="text-align:left">Number of messages to permit</td></tr> |
| </tbody> |
| </table> |
| <p>NB: in this mode it's possible to acknowledge messages in a different connection.</p> |
| <h3><a class="anchor" aria-hidden="true" id="reader-endpoint"></a><a href="#reader-endpoint" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reader endpoint</h3> |
| <p>The reader endpoint requires you to specify a tenant, namespace, and topic in the URL:</p> |
| <pre><code class="hljs css language-http">ws://broker-service-url:8080/ws/v2/reader/persistent/:tenant/:namespace/:topic |
| </code></pre> |
| <h5><a class="anchor" aria-hidden="true" id="query-param-2"></a><a href="#query-param-2" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Query param</h5> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>readerName</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Reader name</td></tr> |
| <tr><td style="text-align:left"><code>receiverQueueSize</code></td><td style="text-align:left">int</td><td style="text-align:left">no</td><td style="text-align:left">Size of the consumer receive queue (default: 1000)</td></tr> |
| <tr><td style="text-align:left"><code>messageId</code></td><td style="text-align:left">int or enum</td><td style="text-align:left">no</td><td style="text-align:left">Message ID to start from, <code>earliest</code> or <code>latest</code> (default: <code>latest</code>)</td></tr> |
| </tbody> |
| </table> |
| <h5><a class="anchor" aria-hidden="true" id="receiving-messages-1"></a><a href="#receiving-messages-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Receiving messages</h5> |
| <p>Server will push messages on the WebSocket session:</p> |
| <pre><code class="hljs css language-json">{ |
| <span class="hljs-attr">"messageId"</span>: <span class="hljs-string">"CAAQAw=="</span>, |
| <span class="hljs-attr">"payload"</span>: <span class="hljs-string">"SGVsbG8gV29ybGQ="</span>, |
| <span class="hljs-attr">"properties"</span>: {<span class="hljs-attr">"key1"</span>: <span class="hljs-string">"value1"</span>, <span class="hljs-attr">"key2"</span>: <span class="hljs-string">"value2"</span>}, |
| <span class="hljs-attr">"publishTime"</span>: <span class="hljs-string">"2016-08-30 16:45:57.785"</span> |
| } |
| </code></pre> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>messageId</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Message ID</td></tr> |
| <tr><td style="text-align:left"><code>payload</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Base-64 encoded payload</td></tr> |
| <tr><td style="text-align:left"><code>publishTime</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Publish timestamp</td></tr> |
| <tr><td style="text-align:left"><code>properties</code></td><td style="text-align:left">key-value pairs</td><td style="text-align:left">no</td><td style="text-align:left">Application-defined properties</td></tr> |
| <tr><td style="text-align:left"><code>key</code></td><td style="text-align:left">string</td><td style="text-align:left">no</td><td style="text-align:left">Original routing key set by producer</td></tr> |
| </tbody> |
| </table> |
| <h4><a class="anchor" aria-hidden="true" id="acknowledging-the-message-1"></a><a href="#acknowledging-the-message-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Acknowledging the message</h4> |
| <p><strong>In WebSocket</strong>, Reader needs to acknowledge the successful processing of the message to |
| have the Pulsar WebSocket service update the number of pending messages. |
| If you don't send acknowledgements, Pulsar WebSocket service will stop sending messages after reaching the pendingMessages limit.</p> |
| <pre><code class="hljs css language-json">{ |
| <span class="hljs-attr">"messageId"</span>: <span class="hljs-string">"CAAQAw=="</span> |
| } |
| </code></pre> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Key</th><th style="text-align:left">Type</th><th style="text-align:left">Required?</th><th style="text-align:left">Explanation</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left"><code>messageId</code></td><td style="text-align:left">string</td><td style="text-align:left">yes</td><td style="text-align:left">Message ID of the processed message</td></tr> |
| </tbody> |
| </table> |
| <h3><a class="anchor" aria-hidden="true" id="error-codes"></a><a href="#error-codes" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Error codes</h3> |
| <p>In case of error the server will close the WebSocket session using the |
| following error codes:</p> |
| <table> |
| <thead> |
| <tr><th style="text-align:left">Error Code</th><th style="text-align:left">Error Message</th></tr> |
| </thead> |
| <tbody> |
| <tr><td style="text-align:left">1</td><td style="text-align:left">Failed to create producer</td></tr> |
| <tr><td style="text-align:left">2</td><td style="text-align:left">Failed to subscribe</td></tr> |
| <tr><td style="text-align:left">3</td><td style="text-align:left">Failed to deserialize from JSON</td></tr> |
| <tr><td style="text-align:left">4</td><td style="text-align:left">Failed to serialize to JSON</td></tr> |
| <tr><td style="text-align:left">5</td><td style="text-align:left">Failed to authenticate client</td></tr> |
| <tr><td style="text-align:left">6</td><td style="text-align:left">Client is not authorized</td></tr> |
| <tr><td style="text-align:left">7</td><td style="text-align:left">Invalid payload encoding</td></tr> |
| <tr><td style="text-align:left">8</td><td style="text-align:left">Unknown error</td></tr> |
| </tbody> |
| </table> |
| <blockquote> |
| <p>The application is responsible for re-establishing a new WebSocket session after a backoff period.</p> |
| </blockquote> |
| <h2><a class="anchor" aria-hidden="true" id="client-examples"></a><a href="#client-examples" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Client examples</h2> |
| <p>Below you'll find code examples for the Pulsar WebSocket API in <a href="#python">Python</a> and <a href="#nodejs">Node.js</a>.</p> |
| <h3><a class="anchor" aria-hidden="true" id="python"></a><a href="#python" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python</h3> |
| <p>This example uses the <a href="https://pypi.python.org/pypi/websocket-client"><code>websocket-client</code></a> package. You can install it using <a href="https://pypi.python.org/pypi/pip">pip</a>:</p> |
| <pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> pip install websocket-client</span> |
| </code></pre> |
| <p>You can also download it from <a href="https://pypi.python.org/pypi/websocket-client">PyPI</a>.</p> |
| <h4><a class="anchor" aria-hidden="true" id="python-producer"></a><a href="#python-producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python producer</h4> |
| <p>Here's an example Python producer that sends a simple message to a Pulsar <a href="/docs/en/2.5.0/reference-terminology#topic">topic</a>:</p> |
| <pre><code class="hljs css language-python"><span class="hljs-keyword">import</span> websocket, base64, json |
| |
| TOPIC = <span class="hljs-string">'ws://localhost:8080/ws/v2/producer/persistent/public/default/my-topic'</span> |
| |
| ws = websocket.create_connection(TOPIC) |
| |
| <span class="hljs-comment"># Send one message as JSON</span> |
| ws.send(json.dumps({ |
| <span class="hljs-string">'payload'</span> : base64.b64encode(<span class="hljs-string">'Hello World'</span>), |
| <span class="hljs-string">'properties'</span>: { |
| <span class="hljs-string">'key1'</span> : <span class="hljs-string">'value1'</span>, |
| <span class="hljs-string">'key2'</span> : <span class="hljs-string">'value2'</span> |
| }, |
| <span class="hljs-string">'context'</span> : <span class="hljs-number">5</span> |
| })) |
| |
| response = json.loads(ws.recv()) |
| <span class="hljs-keyword">if</span> response[<span class="hljs-string">'result'</span>] == <span class="hljs-string">'ok'</span>: |
| <span class="hljs-keyword">print</span> <span class="hljs-string">'Message published successfully'</span> |
| <span class="hljs-keyword">else</span>: |
| <span class="hljs-keyword">print</span> <span class="hljs-string">'Failed to publish message:'</span>, response |
| ws.close() |
| </code></pre> |
| <h4><a class="anchor" aria-hidden="true" id="python-consumer"></a><a href="#python-consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python consumer</h4> |
| <p>Here's an example Python consumer that listens on a Pulsar topic and prints the message ID whenever a message arrives:</p> |
| <pre><code class="hljs css language-python"><span class="hljs-keyword">import</span> websocket, base64, json |
| |
| TOPIC = <span class="hljs-string">'ws://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub'</span> |
| |
| ws = websocket.create_connection(TOPIC) |
| |
| <span class="hljs-keyword">while</span> <span class="hljs-literal">True</span>: |
| msg = json.loads(ws.recv()) |
| <span class="hljs-keyword">if</span> <span class="hljs-keyword">not</span> msg: <span class="hljs-keyword">break</span> |
| |
| <span class="hljs-keyword">print</span> <span class="hljs-string">"Received: {} - payload: {}"</span>.format(msg, base64.b64decode(msg[<span class="hljs-string">'payload'</span>])) |
| |
| <span class="hljs-comment"># Acknowledge successful processing</span> |
| ws.send(json.dumps({<span class="hljs-string">'messageId'</span> : msg[<span class="hljs-string">'messageId'</span>]})) |
| |
| ws.close() |
| </code></pre> |
| <h4><a class="anchor" aria-hidden="true" id="python-reader"></a><a href="#python-reader" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python reader</h4> |
| <p>Here's an example Python reader that listens on a Pulsar topic and prints the message ID whenever a message arrives:</p> |
| <pre><code class="hljs css language-python"><span class="hljs-keyword">import</span> websocket, base64, json |
| |
| TOPIC = <span class="hljs-string">'ws://localhost:8080/ws/v2/reader/persistent/public/default/my-topic'</span> |
| |
| ws = websocket.create_connection(TOPIC) |
| |
| <span class="hljs-keyword">while</span> <span class="hljs-literal">True</span>: |
| msg = json.loads(ws.recv()) |
| <span class="hljs-keyword">if</span> <span class="hljs-keyword">not</span> msg: <span class="hljs-keyword">break</span> |
| |
| <span class="hljs-keyword">print</span> <span class="hljs-string">"Received: {} - payload: {}"</span>.format(msg, base64.b64decode(msg[<span class="hljs-string">'payload'</span>])) |
| |
| <span class="hljs-comment"># Acknowledge successful processing</span> |
| ws.send(json.dumps({<span class="hljs-string">'messageId'</span> : msg[<span class="hljs-string">'messageId'</span>]})) |
| |
| ws.close() |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="nodejs"></a><a href="#nodejs" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Node.js</h3> |
| <p>This example uses the <a href="https://websockets.github.io/ws/"><code>ws</code></a> package. You can install it using <a href="https://www.npmjs.com/">npm</a>:</p> |
| <pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> npm install ws</span> |
| </code></pre> |
| <h4><a class="anchor" aria-hidden="true" id="nodejs-producer"></a><a href="#nodejs-producer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Node.js producer</h4> |
| <p>Here's an example Node.js producer that sends a simple message to a Pulsar topic:</p> |
| <pre><code class="hljs css language-javascript"><span class="hljs-keyword">var</span> WebSocket = <span class="hljs-built_in">require</span>(<span class="hljs-string">'ws'</span>), |
| topic = <span class="hljs-string">"ws://localhost:8080/ws/v2/producer/persistent/public/default/my-topic"</span>, |
| ws = <span class="hljs-keyword">new</span> WebSocket(topic); |
| |
| <span class="hljs-keyword">var</span> message = { |
| <span class="hljs-string">"payload"</span> : <span class="hljs-keyword">new</span> Buffer(<span class="hljs-string">"Hello World"</span>).toString(<span class="hljs-string">'base64'</span>), |
| <span class="hljs-string">"properties"</span>: { |
| <span class="hljs-string">"key1"</span> : <span class="hljs-string">"value1"</span>, |
| <span class="hljs-string">"key2"</span> : <span class="hljs-string">"value2"</span> |
| }, |
| <span class="hljs-string">"context"</span> : <span class="hljs-string">"1"</span> |
| }; |
| |
| ws.on(<span class="hljs-string">'open'</span>, <span class="hljs-function"><span class="hljs-keyword">function</span>(<span class="hljs-params"></span>) </span>{ |
| <span class="hljs-comment">// Send one message</span> |
| ws.send(<span class="hljs-built_in">JSON</span>.stringify(message)); |
| }); |
| |
| ws.on(<span class="hljs-string">'message'</span>, <span class="hljs-function"><span class="hljs-keyword">function</span>(<span class="hljs-params">message</span>) </span>{ |
| <span class="hljs-built_in">console</span>.log(<span class="hljs-string">'received ack: %s'</span>, message); |
| }); |
| </code></pre> |
| <h4><a class="anchor" aria-hidden="true" id="nodejs-consumer"></a><a href="#nodejs-consumer" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Node.js consumer</h4> |
| <p>Here's an example Node.js consumer that listens on the same topic used by the producer above:</p> |
| <pre><code class="hljs css language-javascript"><span class="hljs-keyword">var</span> WebSocket = <span class="hljs-built_in">require</span>(<span class="hljs-string">'ws'</span>), |
| topic = <span class="hljs-string">"ws://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub"</span>, |
| ws = <span class="hljs-keyword">new</span> WebSocket(topic); |
| |
| ws.on(<span class="hljs-string">'message'</span>, <span class="hljs-function"><span class="hljs-keyword">function</span>(<span class="hljs-params">message</span>) </span>{ |
| <span class="hljs-keyword">var</span> receiveMsg = <span class="hljs-built_in">JSON</span>.parse(message); |
| <span class="hljs-built_in">console</span>.log(<span class="hljs-string">'Received: %s - payload: %s'</span>, message, <span class="hljs-keyword">new</span> Buffer(receiveMsg.payload, <span class="hljs-string">'base64'</span>).toString()); |
| <span class="hljs-keyword">var</span> ackMsg = {<span class="hljs-string">"messageId"</span> : receiveMsg.messageId}; |
| ws.send(<span class="hljs-built_in">JSON</span>.stringify(ackMsg)); |
| }); |
| </code></pre> |
| <h4><a class="anchor" aria-hidden="true" id="nodejs-reader"></a><a href="#nodejs-reader" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>NodeJS reader</h4> |
| <pre><code class="hljs css language-javascript"><span class="hljs-keyword">var</span> WebSocket = <span class="hljs-built_in">require</span>(<span class="hljs-string">'ws'</span>), |
| topic = <span class="hljs-string">"ws://localhost:8080/ws/v2/reader/persistent/public/default/my-topic"</span>, |
| ws = <span class="hljs-keyword">new</span> WebSocket(topic); |
| |
| ws.on(<span class="hljs-string">'message'</span>, <span class="hljs-function"><span class="hljs-keyword">function</span>(<span class="hljs-params">message</span>) </span>{ |
| <span class="hljs-keyword">var</span> receiveMsg = <span class="hljs-built_in">JSON</span>.parse(message); |
| <span class="hljs-built_in">console</span>.log(<span class="hljs-string">'Received: %s - payload: %s'</span>, message, <span class="hljs-keyword">new</span> Buffer(receiveMsg.payload, <span class="hljs-string">'base64'</span>).toString()); |
| <span class="hljs-keyword">var</span> ackMsg = {<span class="hljs-string">"messageId"</span> : receiveMsg.messageId}; |
| ws.send(<span class="hljs-built_in">JSON</span>.stringify(ackMsg)); |
| }); |
| </code></pre> |
| </span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.5.0/client-libraries-node"><span class="arrow-prev">← </span><span>Node.js</span></a><a class="docs-next button" href="/docs/en/2.5.0/admin-api-overview"><span>Overview</span><span class="arrow-next"> →</span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#running-the-websocket-service">Running the WebSocket service</a><ul class="toc-headings"><li><a href="#embedded-with-a-pulsar-broker">Embedded with a Pulsar broker</a></li><li><a href="#as-a-separate-component">As a separate component</a></li><li><a href="#starting-the-broker">Starting the broker</a></li></ul></li><li><a href="#api-reference">API Reference</a><ul class="toc-headings"><li><a href="#producer-endpoint">Producer endpoint</a></li><li><a href="#consumer-endpoint">Consumer endpoint</a></li><li><a href="#reader-endpoint">Reader endpoint</a></li><li><a href="#error-codes">Error codes</a></li></ul></li><li><a href="#client-examples">Client examples</a><ul class="toc-headings"><li><a href="#python">Python</a></li><li><a href="#nodejs">Node.js</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script> |
| const community = document.querySelector("a[href='#community']").parentNode; |
| const communityMenu = |
| '<li>' + |
| '<a id="community-menu" href="#">Community <span style="font-size: 0.75em"> ▼</span></a>' + |
| '<div id="community-dropdown" class="hide">' + |
| '<ul id="community-dropdown-items">' + |
| '<li><a href="/en/contact">Contact</a></li>' + |
| '<li><a href="/en/contributing">Contributing</a></li>' + |
| '<li><a href="/en/coding-guide">Coding guide</a></li>' + |
| '<li><a href="/en/events">Events</a></li>' + |
| '<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter ❐</a></li>' + |
| '<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki ❐</a></li>' + |
| '<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking ❐</a></li>' + |
| '<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit ❐</a></li>' + |
| '<li> </li>' + |
| '<li><a href="/en/resources">Resources</a></li>' + |
| '<li><a href="/en/team">Team</a></li>' + |
| '<li><a href="/en/powered-by">Powered By</a></li>' + |
| '</ul>' + |
| '</div>' + |
| '</li>'; |
| |
| community.innerHTML = communityMenu; |
| |
| const communityMenuItem = document.getElementById("community-menu"); |
| const communityDropDown = document.getElementById("community-dropdown"); |
| communityMenuItem.addEventListener("click", function(event) { |
| event.preventDefault(); |
| |
| if (communityDropDown.className == 'hide') { |
| communityDropDown.className = 'visible'; |
| } else { |
| communityDropDown.className = 'hide'; |
| } |
| }); |
| </script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html> |