blob: 3761a128723a5b3d211175c2ccf52409c6a1ce02 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Pulsar Python client · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="Pulsar Python client library is a wrapper over the existing [C++ client library](/docs/en/client-libraries-cpp) and exposes all of the [same features](/api/cpp/2.10.0-SNAPSHOT). You can find the code in the [Python directory](https://github.com/apache/pulsar/tree/master/pulsar-client-cpp/python) of the C++ client code."/><meta name="docsearch:version" content="2.10.0"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Pulsar Python client · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="Pulsar Python client library is a wrapper over the existing [C++ client library](/docs/en/client-libraries-cpp) and exposes all of the [same features](/api/cpp/2.10.0-SNAPSHOT). You can find the code in the [Python directory](https://github.com/apache/pulsar/tree/master/pulsar-client-cpp/python) of the C++ client code."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.10.0</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/client-libraries-python">日本語</a></li><li><a href="/docs/fr/client-libraries-python">Français</a></li><li><a href="/docs/ko/client-libraries-python">한국어</a></li><li><a href="/docs/zh-CN/client-libraries-python">中文</a></li><li><a href="/docs/zh-TW/client-libraries-python">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Client Libraries</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/getting-started-helm">Run Pulsar in Kubernetes</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-proxy-sni-routing">Proxy support with SNI routing</a></li><li class="navListItem"><a class="navItem" href="/docs/en/concepts-multiple-advertised-listeners">Multiple advertised listeners</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-package">How-to: Package</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-connectors">Built-in connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-cdc">CDC connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-develop">Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/io-cli">CLI</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/sql-getting-started">Query data</a></li><li class="navListItem"><a class="navItem" href="/docs/en/sql-deployment-configurations">Configuration and deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/sql-rest-api">REST APIs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Tiered Storage</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/tiered-storage-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/tiered-storage-aws">AWS S3 offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/tiered-storage-gcs">GCS offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/tiered-storage-filesystem">Filesystem offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/tiered-storage-azure">Azure BlobStore offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/tiered-storage-aliyun">Aliyun OSS offloader</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Transactions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/txn-why">Why transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/txn-what">What are transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/txn-how">How transactions work?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/txn-use">How to use transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/txn-monitor">How to monitor transactions?</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Kubernetes (Helm)</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/helm-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/helm-prepare">Prepare</a></li><li class="navListItem"><a class="navItem" href="/docs/en/helm-install">Install</a></li><li class="navListItem"><a class="navItem" href="/docs/en/helm-deploy">Deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/helm-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/helm-tools">Required Tools</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/deploy-dcos">DC/OS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/deploy-docker">Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/deploy-monitoring">Monitor</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-pulsar-manager">Pulsar Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/administration-isolation">Pulsar isolation</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-policy-and-supported-versions">Security Policy and Supported Versions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-tls-keystore">Using TLS with KeyStore configure</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-oauth2">Authentication using OAuth 2.0 access tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-extending">Extend Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/security-bouncy-castle">Bouncy Castle Providers</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Performance</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/performance-pulsar-perf">Pulsar Perf</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-go">Go</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-node">Node.js</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-websocket">WebSocket</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-dotnet">C#</a></li><li class="navListItem"><a class="navItem" href="/docs/en/client-libraries-rest">REST</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-topics">Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-functions">Functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/admin-api-packages">Packages</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/develop-plugin">Plugin</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/client-libraries-python.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Pulsar Python client</h1></header><article><div><span><p>Pulsar Python client library is a wrapper over the existing <a href="/docs/en/client-libraries-cpp">C++ client library</a> and exposes all of the <a href="/api/cpp/2.10.0-SNAPSHOT">same features</a>. You can find the code in the <a href="https://github.com/apache/pulsar/tree/master/pulsar-client-cpp/python">Python directory</a> of the C++ client code.</p>
<p>All the methods in producer, consumer, and reader of a Python client are thread-safe.</p>
<p><a href="https://github.com/BurntSushi/pdoc">pdoc</a>-generated API docs for the Python client are available <a href="/api/python/2.10.0">here</a>.</p>
<h2><a class="anchor" aria-hidden="true" id="install"></a><a href="#install" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Install</h2>
<p>You can install the <a href="https://pypi.python.org/pypi/pulsar-client"><code>pulsar-client</code></a> library either via <a href="https://pypi.python.org/pypi">PyPi</a>, using <a href="#installation-using-pip">pip</a>, or by building the library from <a href="https://github.com/apache/pulsar/tree/master/pulsar-client-cpp">source</a>.</p>
<h3><a class="anchor" aria-hidden="true" id="install-using-pip"></a><a href="#install-using-pip" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Install using pip</h3>
<p>To install the <code>pulsar-client</code> library as a pre-built package using the <a href="https://pip.pypa.io/en/stable/">pip</a> package manager:</p>
<pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> pip install pulsar-client==2.10.0</span>
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="optional-dependencies"></a><a href="#optional-dependencies" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Optional dependencies</h3>
<p>If you install the client libraries on Linux to support services like Pulsar functions or Avro serialization, you can install optional components alongside the <code>pulsar-client</code> library.</p>
<pre><code class="hljs css language-shell"><span class="hljs-meta">#</span><span class="bash"> avro serialization</span>
<span class="hljs-meta">$</span><span class="bash"> pip install pulsar-client[avro]==<span class="hljs-string">'2.10.0'</span></span>
<span class="hljs-meta">
#</span><span class="bash"> <span class="hljs-built_in">functions</span> runtime</span>
<span class="hljs-meta">$</span><span class="bash"> pip install pulsar-client[<span class="hljs-built_in">functions</span>]==<span class="hljs-string">'2.10.0'</span></span>
<span class="hljs-meta">
#</span><span class="bash"> all optional components</span>
<span class="hljs-meta">$</span><span class="bash"> pip install pulsar-client[all]==<span class="hljs-string">'2.10.0'</span></span>
</code></pre>
<p>Installation via PyPi is available for the following Python versions:</p>
<table>
<thead>
<tr><th style="text-align:left">Platform</th><th style="text-align:left">Supported Python versions</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">MacOS <br /> 10.13 (High Sierra), 10.14 (Mojave) <br /></td><td style="text-align:left">2.7, 3.7, 3.8, 3.9</td></tr>
<tr><td style="text-align:left">Linux</td><td style="text-align:left">2.7, 3.4, 3.5, 3.6, 3.7, 3.8, 3.9</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="install-from-source"></a><a href="#install-from-source" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Install from source</h3>
<p>To install the <code>pulsar-client</code> library by building from source, follow <a href="/docs/en/client-libraries-cpp#compilation">instructions</a> and compile the Pulsar C++ client library. That builds the Python binding for the library.</p>
<p>To install the built Python bindings:</p>
<pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> git <span class="hljs-built_in">clone</span> https://github.com/apache/pulsar</span>
<span class="hljs-meta">$</span><span class="bash"> <span class="hljs-built_in">cd</span> pulsar/pulsar-client-cpp/python</span>
<span class="hljs-meta">$</span><span class="bash"> sudo python setup.py install</span>
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="api-reference"></a><a href="#api-reference" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>API Reference</h2>
<p>The complete Python API reference is available at <a href="/api/python/2.10.0">api/python</a>.</p>
<h2><a class="anchor" aria-hidden="true" id="examples"></a><a href="#examples" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Examples</h2>
<p>You can find a variety of Python code examples for the <a href="/pulsar-client-cpp/python">pulsar-client</a> library.</p>
<h3><a class="anchor" aria-hidden="true" id="producer-example"></a><a href="#producer-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Producer example</h3>
<p>The following example creates a Python producer for the <code>my-topic</code> topic and sends 10 messages on that topic:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">import</span> pulsar
client = pulsar.Client(<span class="hljs-string">'pulsar://localhost:6650'</span>)
producer = client.create_producer(<span class="hljs-string">'my-topic'</span>)
<span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> range(<span class="hljs-number">10</span>):
producer.send((<span class="hljs-string">'Hello-%d'</span> % i).encode(<span class="hljs-string">'utf-8'</span>))
client.close()
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="consumer-example"></a><a href="#consumer-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Consumer example</h3>
<p>The following example creates a consumer with the <code>my-subscription</code> subscription name on the <code>my-topic</code> topic, receives incoming messages, prints the content and ID of messages that arrive, and acknowledges each message to the Pulsar broker.</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">import</span> pulsar
client = pulsar.Client(<span class="hljs-string">'pulsar://localhost:6650'</span>)
consumer = client.subscribe(<span class="hljs-string">'my-topic'</span>, <span class="hljs-string">'my-subscription'</span>)
<span class="hljs-keyword">while</span> <span class="hljs-literal">True</span>:
msg = consumer.receive()
<span class="hljs-keyword">try</span>:
print(<span class="hljs-string">"Received message '{}' id='{}'"</span>.format(msg.data(), msg.message_id()))
<span class="hljs-comment"># Acknowledge successful processing of the message</span>
consumer.acknowledge(msg)
<span class="hljs-keyword">except</span> Exception:
<span class="hljs-comment"># Message failed to be processed</span>
consumer.negative_acknowledge(msg)
client.close()
</code></pre>
<p>This example shows how to configure negative acknowledgement.</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Client, schema
client = Client(<span class="hljs-string">'pulsar://localhost:6650'</span>)
consumer = client.subscribe(<span class="hljs-string">'negative_acks'</span>,<span class="hljs-string">'test'</span>,schema=schema.StringSchema())
producer = client.create_producer(<span class="hljs-string">'negative_acks'</span>,schema=schema.StringSchema())
<span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> range(<span class="hljs-number">10</span>):
print(<span class="hljs-string">'send msg "hello-%d"'</span> % i)
producer.send_async(<span class="hljs-string">'hello-%d'</span> % i, callback=<span class="hljs-literal">None</span>)
producer.flush()
<span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> range(<span class="hljs-number">10</span>):
msg = consumer.receive()
consumer.negative_acknowledge(msg)
print(<span class="hljs-string">'receive and nack msg "%s"'</span> % msg.data())
<span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> range(<span class="hljs-number">10</span>):
msg = consumer.receive()
consumer.acknowledge(msg)
print(<span class="hljs-string">'receive and ack msg "%s"'</span> % msg.data())
<span class="hljs-keyword">try</span>:
<span class="hljs-comment"># No more messages expected</span>
msg = consumer.receive(<span class="hljs-number">100</span>)
<span class="hljs-keyword">except</span>:
print(<span class="hljs-string">"no more msg"</span>)
<span class="hljs-keyword">pass</span>
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="reader-interface-example"></a><a href="#reader-interface-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Reader interface example</h3>
<p>You can use the Pulsar Python API to use the Pulsar <a href="/docs/en/concepts-clients#reader-interface">reader interface</a>. Here's an example:</p>
<pre><code class="hljs css language-python"><span class="hljs-comment"># MessageId taken from a previously fetched message</span>
msg_id = msg.message_id()
reader = client.create_reader(<span class="hljs-string">'my-topic'</span>, msg_id)
<span class="hljs-keyword">while</span> <span class="hljs-literal">True</span>:
msg = reader.read_next()
print(<span class="hljs-string">"Received message '{}' id='{}'"</span>.format(msg.data(), msg.message_id()))
<span class="hljs-comment"># No acknowledgment</span>
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="multi-topic-subscriptions"></a><a href="#multi-topic-subscriptions" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Multi-topic subscriptions</h3>
<p>In addition to subscribing a consumer to a single Pulsar topic, you can also subscribe to multiple topics simultaneously. To use multi-topic subscriptions, you can supply a regular expression (regex) or a <code>List</code> of topics. If you select topics via regex, all topics must be within the same Pulsar namespace.</p>
<p>The following is an example:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">import</span> re
consumer = client.subscribe(re.compile(<span class="hljs-string">'persistent://public/default/topic-*'</span>), <span class="hljs-string">'my-subscription'</span>)
<span class="hljs-keyword">while</span> <span class="hljs-literal">True</span>:
msg = consumer.receive()
<span class="hljs-keyword">try</span>:
print(<span class="hljs-string">"Received message '{}' id='{}'"</span>.format(msg.data(), msg.message_id()))
<span class="hljs-comment"># Acknowledge successful processing of the message</span>
consumer.acknowledge(msg)
<span class="hljs-keyword">except</span> Exception:
<span class="hljs-comment"># Message failed to be processed</span>
consumer.negative_acknowledge(msg)
client.close()
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="schema"></a><a href="#schema" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Schema</h2>
<h3><a class="anchor" aria-hidden="true" id="supported-schema-types"></a><a href="#supported-schema-types" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Supported schema types</h3>
<p>You can use different builtin schema types in Pulsar. All the definitions are in the <code>pulsar.schema</code> package.</p>
<table>
<thead>
<tr><th>Schema</th><th>Notes</th></tr>
</thead>
<tbody>
<tr><td><code>BytesSchema</code></td><td>Get the raw payload as a <code>bytes</code> object. No serialization/deserialization are performed. This is the default schema mode</td></tr>
<tr><td><code>StringSchema</code></td><td>Encode/decode payload as a UTF-8 string. Uses <code>str</code> objects</td></tr>
<tr><td><code>JsonSchema</code></td><td>Require record definition. Serializes the record into standard JSON payload</td></tr>
<tr><td><code>AvroSchema</code></td><td>Require record definition. Serializes in AVRO format</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="schema-definition-reference"></a><a href="#schema-definition-reference" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Schema definition reference</h3>
<p>The schema definition is done through a class that inherits from <code>pulsar.schema.Record</code>.</p>
<p>This class has a number of fields which can be of either
<code>pulsar.schema.Field</code> type or another nested <code>Record</code>. All the
fields are specified in the <code>pulsar.schema</code> package. The fields
are matching the AVRO fields types.</p>
<table>
<thead>
<tr><th>Field Type</th><th>Python Type</th><th>Notes</th></tr>
</thead>
<tbody>
<tr><td><code>Boolean</code></td><td><code>bool</code></td><td></td></tr>
<tr><td><code>Integer</code></td><td><code>int</code></td><td></td></tr>
<tr><td><code>Long</code></td><td><code>int</code></td><td></td></tr>
<tr><td><code>Float</code></td><td><code>float</code></td><td></td></tr>
<tr><td><code>Double</code></td><td><code>float</code></td><td></td></tr>
<tr><td><code>Bytes</code></td><td><code>bytes</code></td><td></td></tr>
<tr><td><code>String</code></td><td><code>str</code></td><td></td></tr>
<tr><td><code>Array</code></td><td><code>list</code></td><td>Need to specify record type for items.</td></tr>
<tr><td><code>Map</code></td><td><code>dict</code></td><td>Key is always <code>String</code>. Need to specify value type.</td></tr>
</tbody>
</table>
<p>Additionally, any Python <code>Enum</code> type can be used as a valid field type.</p>
<h4><a class="anchor" aria-hidden="true" id="fields-parameters"></a><a href="#fields-parameters" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Fields parameters</h4>
<p>When adding a field, you can use these parameters in the constructor.</p>
<table>
<thead>
<tr><th>Argument</th><th>Default</th><th>Notes</th></tr>
</thead>
<tbody>
<tr><td><code>default</code></td><td><code>None</code></td><td>Set a default value for the field. Eg: <code>a = Integer(default=5)</code></td></tr>
<tr><td><code>required</code></td><td><code>False</code></td><td>Mark the field as &quot;required&quot;. It is set in the schema accordingly.</td></tr>
</tbody>
</table>
<h4><a class="anchor" aria-hidden="true" id="schema-definition-examples"></a><a href="#schema-definition-examples" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Schema definition examples</h4>
<h5><a class="anchor" aria-hidden="true" id="simple-definition"></a><a href="#simple-definition" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Simple definition</h5>
<pre><code class="hljs css language-python"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Example</span><span class="hljs-params">(Record)</span>:</span>
a = String()
b = Integer()
c = Array(String())
i = Map(String())
</code></pre>
<h5><a class="anchor" aria-hidden="true" id="using-enums"></a><a href="#using-enums" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Using enums</h5>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> enum <span class="hljs-keyword">import</span> Enum
<span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Color</span><span class="hljs-params">(Enum)</span>:</span>
red = <span class="hljs-number">1</span>
green = <span class="hljs-number">2</span>
blue = <span class="hljs-number">3</span>
<span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Example</span><span class="hljs-params">(Record)</span>:</span>
name = String()
color = Color
</code></pre>
<h5><a class="anchor" aria-hidden="true" id="complex-types"></a><a href="#complex-types" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Complex types</h5>
<pre><code class="hljs css language-python"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MySubRecord</span><span class="hljs-params">(Record)</span>:</span>
x = Integer()
y = Long()
z = String()
<span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Example</span><span class="hljs-params">(Record)</span>:</span>
a = String()
sub = MySubRecord()
</code></pre>
<h5><a class="anchor" aria-hidden="true" id="set-namespace-for-avro-schema"></a><a href="#set-namespace-for-avro-schema" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Set namespace for Avro schema</h5>
<p>Set the namespace for Avro Record schema using the special field <code>_avro_namespace</code>.</p>
<pre><code class="hljs css language-python"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">NamespaceDemo</span><span class="hljs-params">(Record)</span>:</span>
_avro_namespace = <span class="hljs-string">'xxx.xxx.xxx'</span>
x = String()
y = Integer()
</code></pre>
<p>The schema definition is like this.</p>
<pre><code class="hljs">{
<span class="hljs-string">'name'</span>: <span class="hljs-string">'NamespaceDemo'</span>, <span class="hljs-string">'namespace'</span>: <span class="hljs-string">'xxx.xxx.xxx'</span>, <span class="hljs-string">'type'</span>: <span class="hljs-string">'record'</span>, <span class="hljs-string">'fields'</span>: [
{<span class="hljs-string">'name'</span>: <span class="hljs-string">'x'</span>, <span class="hljs-string">'type'</span>: [<span class="hljs-string">'null'</span>, <span class="hljs-string">'string'</span>]},
{<span class="hljs-string">'name'</span>: <span class="hljs-string">'y'</span>, <span class="hljs-string">'type'</span>: [<span class="hljs-string">'null'</span>, <span class="hljs-string">'int'</span>]}
]
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="declare-and-validate-schema"></a><a href="#declare-and-validate-schema" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Declare and validate schema</h3>
<p>You can send messages using <code>BytesSchema</code>, <code>StringSchema</code>, <code>AvroSchema</code>, and <code>JsonSchema</code>.</p>
<p>Before the producer is created, the Pulsar broker validates that the existing topic schema is the correct type and that the format is compatible with the schema definition of a class. If the format of the topic schema is incompatible with the schema definition, an exception occurs in the producer creation.</p>
<p>Once a producer is created with a certain schema definition, it only accepts objects that are instances of the declared schema class.</p>
<p>Similarly, for a consumer or reader, the consumer returns an object (which is an instance of the schema record class) rather than raw bytes.</p>
<p><strong>Example</strong></p>
<pre><code class="hljs css language-python">consumer = client.subscribe(
topic=<span class="hljs-string">'my-topic'</span>,
subscription_name=<span class="hljs-string">'my-subscription'</span>,
schema=AvroSchema(Example) )
<span class="hljs-keyword">while</span> <span class="hljs-literal">True</span>:
msg = consumer.receive()
ex = msg.value()
<span class="hljs-keyword">try</span>:
print(<span class="hljs-string">"Received message a={} b={} c={}"</span>.format(ex.a, ex.b, ex.c))
<span class="hljs-comment"># Acknowledge successful processing of the message</span>
consumer.acknowledge(msg)
<span class="hljs-keyword">except</span> Exception:
<span class="hljs-comment"># Message failed to be processed</span>
consumer.negative_acknowledge(msg)
</code></pre>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-964-tab-965" class="nav-link active" data-group="group_964" data-tab="tab-group-964-content-965">BytesSchema</div><div id="tab-group-964-tab-966" class="nav-link" data-group="group_964" data-tab="tab-group-964-content-966">StringSchema</div><div id="tab-group-964-tab-967" class="nav-link" data-group="group_964" data-tab="tab-group-964-content-967">AvroSchema</div><div id="tab-group-964-tab-968" class="nav-link" data-group="group_964" data-tab="tab-group-964-content-968">JsonSchema</div></div><div class="tab-content"><div id="tab-group-964-content-965" class="tab-pane active" data-group="group_964" tabindex="-1"><div><span><p>You can send byte data using a <code>BytesSchema</code>.</p>
<p><strong>Example</strong></p>
<pre><code class="hljs css language-python">producer = client.create_producer(<br /> <span class="hljs-string">'bytes-schema-topic'</span>,<br /> schema=BytesSchema())<br />producer.send(<span class="hljs-string">b"Hello"</span>)<br /><br />consumer = client.subscribe(<br /> <span class="hljs-string">'bytes-schema-topic'</span>,<br /> <span class="hljs-string">'sub'</span>,<br /> schema=BytesSchema())<br />msg = consumer.receive()<br />data = msg.value()<br /></code></pre>
</span></div></div><div id="tab-group-964-content-966" class="tab-pane" data-group="group_964" tabindex="-1"><div><span><p>You can send string data using a <code>StringSchema</code>.</p>
<p><strong>Example</strong></p>
<pre><code class="hljs css language-python">producer = client.create_producer(<br /> <span class="hljs-string">'string-schema-topic'</span>,<br /> schema=StringSchema())<br />producer.send(<span class="hljs-string">"Hello"</span>)<br /><br />consumer = client.subscribe(<br /> <span class="hljs-string">'string-schema-topic'</span>,<br /> <span class="hljs-string">'sub'</span>,<br /> schema=StringSchema())<br />msg = consumer.receive()<br />str = msg.value()<br /></code></pre>
</span></div></div><div id="tab-group-964-content-967" class="tab-pane" data-group="group_964" tabindex="-1"><div><span><p>You can declare an <code>AvroSchema</code> using one of the following methods.</p>
<h4><a class="anchor" aria-hidden="true" id="method-1-record"></a><a href="#method-1-record" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Method 1: Record</h4>
<p>You can declare an <code>AvroSchema</code> by passing a class that inherits
from <code>pulsar.schema.Record</code> and defines the fields as
class variables.</p>
<p><strong>Example</strong></p>
<pre><code class="hljs css language-python"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Example</span><span class="hljs-params">(Record)</span>:</span><br /> a = Integer()<br /> b = Integer()<br /><br />producer = client.create_producer(<br /> <span class="hljs-string">'avro-schema-topic'</span>,<br /> schema=AvroSchema(Example))<br />r = Example(a=<span class="hljs-number">1</span>, b=<span class="hljs-number">2</span>)<br />producer.send(r)<br /><br />consumer = client.subscribe(<br /> <span class="hljs-string">'avro-schema-topic'</span>,<br /> <span class="hljs-string">'sub'</span>,<br /> schema=AvroSchema(Example))<br />msg = consumer.receive()<br />e = msg.value()<br /></code></pre>
<h4><a class="anchor" aria-hidden="true" id="method-2-json-definition"></a><a href="#method-2-json-definition" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Method 2: JSON definition</h4>
<p>You can declare an <code>AvroSchema</code> using JSON. In this case, Avro schemas are defined using JSON.</p>
<p><strong>Example</strong></p>
<p>Below is an <code>AvroSchema</code> defined using a JSON file (<em>company.avsc</em>).</p>
<pre><code class="hljs css language-json">{<br /> <span class="hljs-attr">"doc"</span>: <span class="hljs-string">"this is doc"</span>,<br /> <span class="hljs-attr">"namespace"</span>: <span class="hljs-string">"example.avro"</span>,<br /> <span class="hljs-attr">"type"</span>: <span class="hljs-string">"record"</span>,<br /> <span class="hljs-attr">"name"</span>: <span class="hljs-string">"Company"</span>,<br /> <span class="hljs-attr">"fields"</span>: [<br /> {<span class="hljs-attr">"name"</span>: <span class="hljs-string">"name"</span>, <span class="hljs-attr">"type"</span>: [<span class="hljs-string">"null"</span>, <span class="hljs-string">"string"</span>]},<br /> {<span class="hljs-attr">"name"</span>: <span class="hljs-string">"address"</span>, <span class="hljs-attr">"type"</span>: [<span class="hljs-string">"null"</span>, <span class="hljs-string">"string"</span>]},<br /> {<span class="hljs-attr">"name"</span>: <span class="hljs-string">"employees"</span>, <span class="hljs-attr">"type"</span>: [<span class="hljs-string">"null"</span>, {<span class="hljs-attr">"type"</span>: <span class="hljs-string">"array"</span>, <span class="hljs-attr">"items"</span>: {<br /> <span class="hljs-attr">"type"</span>: <span class="hljs-string">"record"</span>,<br /> <span class="hljs-attr">"name"</span>: <span class="hljs-string">"Employee"</span>,<br /> <span class="hljs-attr">"fields"</span>: [<br /> {<span class="hljs-attr">"name"</span>: <span class="hljs-string">"name"</span>, <span class="hljs-attr">"type"</span>: [<span class="hljs-string">"null"</span>, <span class="hljs-string">"string"</span>]},<br /> {<span class="hljs-attr">"name"</span>: <span class="hljs-string">"age"</span>, <span class="hljs-attr">"type"</span>: [<span class="hljs-string">"null"</span>, <span class="hljs-string">"int"</span>]}<br /> ]<br /> }}]},<br /> {<span class="hljs-attr">"name"</span>: <span class="hljs-string">"labels"</span>, <span class="hljs-attr">"type"</span>: [<span class="hljs-string">"null"</span>, {<span class="hljs-attr">"type"</span>: <span class="hljs-string">"map"</span>, <span class="hljs-attr">"values"</span>: <span class="hljs-string">"string"</span>}]}<br /> ]<br />}<br /></code></pre>
<p>You can load a schema definition from file by using [<code>avro.schema</code>]((<a href="http://avro.apache.org/docs/current/gettingstartedpython.html">http://avro.apache.org/docs/current/gettingstartedpython.html</a>) or <a href="https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema"><code>fastavro.schema</code></a>.</p>
<p>If you use the &quot;JSON definition&quot; method to declare an <code>AvroSchema</code>, pay attention to the following points:</p>
<ul>
<li><p>You need to use <a href="https://developers.google.com/edu/python/dict-files">Python dict</a> to produce and consume messages, which is different from using the &quot;Record&quot; method.</p></li>
<li><p>When generating an <code>AvroSchema</code> object, set <code>_record_cls</code> parameter to <code>None</code>.</p></li>
</ul>
<p><strong>Example</strong></p>
<pre><code class="hljs"><span class="hljs-keyword">from</span> fastavro.<span class="hljs-keyword">schema</span> <span class="hljs-keyword">import</span> load_schema<br /><span class="hljs-keyword">from</span> pulsar.<span class="hljs-keyword">schema</span> <span class="hljs-keyword">import</span> *<br />schema_definition = load_schema("examples/company.avsc")<br />avro_schema = AvroSchema(<span class="hljs-keyword">None</span>, schema_definition=schema_definition)<br />producer = client.create_producer(<br /> topic=topic,<br /> schema=avro_schema)<br />consumer = client.subscribe(topic, <span class="hljs-string">'test'</span>, schema=avro_schema)<br />company = {<br /> "name": "company-name" + str(i),<br /> "address": <span class="hljs-string">'xxx road xxx street '</span> + str(i),<br /> "employees": [<br /> {"name": "user" + str(i), "age": <span class="hljs-number">20</span> + i},<br /> {"name": "user" + str(i), "age": <span class="hljs-number">30</span> + i},<br /> {"name": "user" + str(i), "age": <span class="hljs-number">35</span> + i},<br /> ],<br /> "labels": {<br /> "industry": "software" + str(i),<br /> "scale": "&gt;100",<br /> "funds": "1000000.0"<br /> }<br />}<br />producer.send(company)<br />msg = consumer.receive()<br /># Users could <span class="hljs-keyword">get</span> a dict <span class="hljs-keyword">object</span> <span class="hljs-keyword">by</span> `<span class="hljs-keyword">value</span>()` <span class="hljs-keyword">method</span>.<br />msg.<span class="hljs-keyword">value</span>()<br /></code></pre>
</span></div></div><div id="tab-group-964-content-968" class="tab-pane" data-group="group_964" tabindex="-1"><div><span><h4><a class="anchor" aria-hidden="true" id="record"></a><a href="#record" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Record</h4>
<p>You can declare a <code>JsonSchema</code> by passing a class that inherits
from <code>pulsar.schema.Record</code> and defines the fields as class variables. This is similar to using <code>AvroSchema</code>. The only difference is to use <code>JsonSchema</code> instead of <code>AvroSchema</code> when defining schema type as shown below. For how to use <code>AvroSchema</code> via record, see <a href="/docs/en/client-libraries-python#method-1-record">here</a>.</p>
<pre><code class="hljs">producer = client.create_producer(<br /> <span class="hljs-string">'avro-schema-topic'</span>,<br /> <span class="hljs-attribute">schema</span>=JsonSchema(Example))<br /><br />consumer = client.subscribe(<br /> <span class="hljs-string">'avro-schema-topic'</span>,<br /> <span class="hljs-string">'sub'</span>,<br /> <span class="hljs-attribute">schema</span>=JsonSchema(Example))<br /></code></pre>
</span></div></div></div></div>
<h2><a class="anchor" aria-hidden="true" id="end-to-end-encryption"></a><a href="#end-to-end-encryption" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>End-to-end encryption</h2>
<p><a href="https://pulsar.apache.org/docs/en/next/cookbooks-encryption/#docsNav">End-to-end encryption</a> allows applications to encrypt messages at producers and decrypt messages at consumers.</p>
<h3><a class="anchor" aria-hidden="true" id="configuration"></a><a href="#configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuration</h3>
<p>To use the end-to-end encryption feature in the Python client, you need to configure <code>publicKeyPath</code> and <code>privateKeyPath</code> for both producer and consumer.</p>
<pre><code class="hljs"><span class="hljs-symbol">publicKeyPath:</span> <span class="hljs-string">"./public.pem"</span>
<span class="hljs-symbol">privateKeyPath:</span> <span class="hljs-string">"./private.pem"</span>
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="tutorial"></a><a href="#tutorial" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tutorial</h3>
<p>This section provides step-by-step instructions on how to use the end-to-end encryption feature in the Python client.</p>
<p><strong>Prerequisite</strong></p>
<ul>
<li>Pulsar Python client 2.7.1 or later</li>
</ul>
<p><strong>Step</strong></p>
<ol>
<li><p>Create both public and private key pairs.</p>
<p><strong>Input</strong></p>
<pre><code class="hljs css language-shell">openssl genrsa -out private.pem 2048
openssl rsa -in private.pem -pubout -out public.pem
</code></pre></li>
<li><p>Create a producer to send encrypted messages.</p>
<p><strong>Input</strong></p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">import</span> pulsar
publicKeyPath = <span class="hljs-string">"./public.pem"</span>
privateKeyPath = <span class="hljs-string">"./private.pem"</span>
crypto_key_reader = pulsar.CryptoKeyReader(publicKeyPath, privateKeyPath)
client = pulsar.Client(<span class="hljs-string">'pulsar://localhost:6650'</span>)
producer = client.create_producer(topic=<span class="hljs-string">'encryption'</span>, encryption_key=<span class="hljs-string">'encryption'</span>, crypto_key_reader=crypto_key_reader)
producer.send(<span class="hljs-string">'encryption message'</span>.encode(<span class="hljs-string">'utf8'</span>))
print(<span class="hljs-string">'sent message'</span>)
producer.close()
client.close()
</code></pre></li>
<li><p>Create a consumer to receive encrypted messages.</p>
<p><strong>Input</strong></p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">import</span> pulsar
publicKeyPath = <span class="hljs-string">"./public.pem"</span>
privateKeyPath = <span class="hljs-string">"./private.pem"</span>
crypto_key_reader = pulsar.CryptoKeyReader(publicKeyPath, privateKeyPath)
client = pulsar.Client(<span class="hljs-string">'pulsar://localhost:6650'</span>)
consumer = client.subscribe(topic=<span class="hljs-string">'encryption'</span>, subscription_name=<span class="hljs-string">'encryption-sub'</span>, crypto_key_reader=crypto_key_reader)
msg = consumer.receive()
print(<span class="hljs-string">"Received msg '{}' id = '{}'"</span>.format(msg.data(), msg.message_id()))
consumer.close()
client.close()
</code></pre></li>
<li><p>Run the consumer to receive encrypted messages.</p>
<p><strong>Input</strong></p>
<pre><code class="hljs css language-shell">python consumer.py
</code></pre></li>
<li><p>In a new terminal tab, run the producer to produce encrypted messages.</p>
<p><strong>Input</strong></p>
<pre><code class="hljs css language-shell">python producer.py
</code></pre>
<p>Now you can see the producer sends messages and the consumer receives messages successfully.</p>
<p><strong>Output</strong></p>
<p>This is from the producer side.</p>
<pre><code class="hljs">sent <span class="hljs-keyword">message</span>
</code></pre>
<p>This is from the consumer side.</p>
<pre><code class="hljs">Received msg 'encryption message' id = '(<span class="hljs-number">0</span>,<span class="hljs-number">0</span>,<span class="hljs-number">-1</span>,<span class="hljs-number">-1</span>)'
</code></pre></li>
</ol>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/client-libraries-go"><span class="arrow-prev"></span><span>Go</span></a><a class="docs-next button" href="/docs/en/client-libraries-cpp"><span>C++</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#install">Install</a><ul class="toc-headings"><li><a href="#install-using-pip">Install using pip</a></li><li><a href="#optional-dependencies">Optional dependencies</a></li><li><a href="#install-from-source">Install from source</a></li></ul></li><li><a href="#api-reference">API Reference</a></li><li><a href="#examples">Examples</a><ul class="toc-headings"><li><a href="#producer-example">Producer example</a></li><li><a href="#consumer-example">Consumer example</a></li><li><a href="#reader-interface-example">Reader interface example</a></li><li><a href="#multi-topic-subscriptions">Multi-topic subscriptions</a></li></ul></li><li><a href="#schema">Schema</a><ul class="toc-headings"><li><a href="#supported-schema-types">Supported schema types</a></li><li><a href="#schema-definition-reference">Schema definition reference</a></li><li><a href="#declare-and-validate-schema">Declare and validate schema</a></li></ul></li><li><a href="#end-to-end-encryption">End-to-end encryption</a><ul class="toc-headings"><li><a href="#configuration">Configuration</a></li><li><a href="#tutorial">Tutorial</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>