blob: b64abd48fcbb4f85342a071433226bc3d3c2322c [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>The Pulsar Functions API · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="[Pulsar Functions](/docs/en/2.1.0-incubating/functions-overview) provides an easy-to-use API that developers can use to create and manage processing logic for the Apache Pulsar messaging system. With Pulsar Functions, you can write functions of any level of complexity in [Java](#functions-for-java) or [Python](#functions-for-python) and run them in conjunction with a Pulsar cluster without needing to run a separate stream processing engine."/><meta name="docsearch:version" content="2.1.0-incubating"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="The Pulsar Functions API · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="[Pulsar Functions](/docs/en/2.1.0-incubating/functions-overview) provides an easy-to-use API that developers can use to create and manage processing logic for the Apache Pulsar messaging system. With Pulsar Functions, you can write functions of any level of complexity in [Java](#functions-for-java) or [Python](#functions-for-python) and run them in conjunction with a Pulsar cluster without needing to run a separate stream processing engine."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.1.0-incubating</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.1.0-incubating/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.1.0-incubating/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.1.0-incubating/functions-api">日本語</a></li><li><a href="/docs/fr/2.1.0-incubating/functions-api">Français</a></li><li><a href="/docs/ko/2.1.0-incubating/functions-api">한국어</a></li><li><a href="/docs/zh-CN/2.1.0-incubating/functions-api">中文</a></li><li><a href="/docs/zh-TW/2.1.0-incubating/functions-api">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Pulsar Functions</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Getting Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/getting-started-docker">Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/client-libraries">Client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/concepts-schema-registry">Schema Registry</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-quickstart">Getting started</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-api">API</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-deploying">Deploying functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-guarantees">Processing guarantees</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-state">State Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/functions-metrics">Metrics</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/io-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/io-managing">Managing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/io-connectors">Builtin Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/io-develop">Developing Connectors</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/administration-dashboard">Dashboard</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/administration-load-distribution">Load distribution</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/administration-proxy">Pulsar proxy</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/security-extending">Extending</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/client-libraries-websocket">WebSocket</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/admin-api-schemas">Schemas</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/cookbooks-message-queue">Message queue</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.1.0-incubating/reference-configuration">Pulsar configuration</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/functions-api.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">The Pulsar Functions API</h1></header><article><div><span><p><a href="/docs/en/2.1.0-incubating/functions-overview">Pulsar Functions</a> provides an easy-to-use API that developers can use to create and manage processing logic for the Apache Pulsar messaging system. With Pulsar Functions, you can write functions of any level of complexity in <a href="#functions-for-java">Java</a> or <a href="#functions-for-python">Python</a> and run them in conjunction with a Pulsar cluster without needing to run a separate stream processing engine.</p>
<blockquote>
<p>For a more in-depth overview of the Pulsar Functions feature, see the <a href="/docs/en/2.1.0-incubating/functions-overview">Pulsar Functions overview</a>.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="core-programming-model"></a><a href="#core-programming-model" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Core programming model</h2>
<p>Pulsar Functions provide a wide range of functionality but are based on a very simple programming model. You can think of Pulsar Functions as lightweight processes that</p>
<ul>
<li>consume messages from one or more Pulsar topics and then</li>
<li>apply some user-defined processing logic to each incoming message. That processing logic could be just about anything you want, including
<ul>
<li>producing the resulting, processed message on another Pulsar topic, or</li>
<li>doing something else with the message, such as writing results to an external database.</li>
</ul></li>
</ul>
<p>You could use Pulsar Functions, for example, to set up the following processing chain:</p>
<ul>
<li>A <a href="#functions-for-python">Python</a> function listens on the <code>raw-sentences</code> topic and &quot;<a href="#example-function">sanitizes</a>&quot; incoming strings (removing extraneous whitespace and converting all characters to lower case) and then publishes the results to a <code>sanitized-sentences</code> topic</li>
<li>A <a href="#functions-for-java">Java</a> function listens on the <code>sanitized-sentences</code> topic, counts the number of times each word appears within a specified time window, and publishes the results to a <code>results</code> topic</li>
<li>Finally, a Python function listens on the <code>results</code> topic and writes the results to a MySQL table</li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="example-function"></a><a href="#example-function" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example function</h3>
<p>Here's an example &quot;input sanitizer&quot; function written in Python and stored in a <code>sanitizer.py</code> file:</p>
<pre><code class="hljs css language-python"><span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">clean_string</span><span class="hljs-params">(s)</span>:</span>
<span class="hljs-keyword">return</span> s.strip().lower()
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(input)</span>:</span>
<span class="hljs-keyword">return</span> clean_string(input)
</code></pre>
<p>Some things to note about this Pulsar Function:</p>
<ul>
<li>There is no client, producer, or consumer object involved. All message &quot;plumbing&quot; is already taken care of for you, enabling you to worry only about processing logic.</li>
<li>No topics, subscription types, tenants, or namespaces are specified in the function logic itself. Instead, topics are specified upon <a href="#example-deployment">deployment</a>. This means that you can use and re-use Pulsar Functions across topics, tenants, and namespaces without needing to hard-code those attributes.</li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="example-deployment"></a><a href="#example-deployment" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example deployment</h3>
<p>Deploying Pulsar Functions is handled by the <a href="/docs/en/2.1.0-incubating/reference-pulsar-admin"><code>pulsar-admin</code></a> CLI tool, in particular the <a href="/docs/en/2.1.0-incubating/reference-pulsar-admin#functions"><code>functions</code></a> command. Here's an example command that would run our <a href="#example-function">sanitizer</a> function from above in <a href="/docs/en/2.1.0-incubating/functions-deploying#local-run-mode">local run</a> mode:</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> localrun \
--py sanitizer.py \ <span class="hljs-comment"># The Python file with the function's code</span>
--className sanitizer \ <span class="hljs-comment"># The class or function holding the processing logic</span>
--tenant public \ <span class="hljs-comment"># The function's tenant (derived from the topic name by default)</span>
--namespace default \ <span class="hljs-comment"># The function's namespace (derived from the topic name by default)</span>
--name sanitizer-function \ <span class="hljs-comment"># The name of the function (the class name by default)</span>
--inputs dirty-strings-in \ <span class="hljs-comment"># The input topic(s) for the function</span>
--output clean-strings-out \ <span class="hljs-comment"># The output topic for the function</span>
--logTopic sanitizer-logs <span class="hljs-comment"># The topic to which all functions logs are published</span>
</code></pre>
<p>For instructions on running functions in your Pulsar cluster, see the <a href="/docs/en/2.1.0-incubating/functions-deploying">Deploying Pulsar Functions</a> guide.</p>
<h3><a class="anchor" aria-hidden="true" id="available-apis"></a><a href="#available-apis" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Available APIs</h3>
<p>In both Java and Python, you have two options for writing Pulsar Functions:</p>
<table>
<thead>
<tr><th style="text-align:left">Interface</th><th style="text-align:left">Description</th><th style="text-align:left">Use cases</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">Language-native interface</td><td style="text-align:left">No Pulsar-specific libraries or special dependencies required (only core libraries from Java/Python)</td><td style="text-align:left">Functions that don't require access to the function's <a href="#context">context</a></td></tr>
<tr><td style="text-align:left">Pulsar Function SDK for Java/Python</td><td style="text-align:left">Pulsar-specific libraries that provide a range of functionality not provided by &quot;native&quot; interfaces</td><td style="text-align:left">Functions that require access to the function's <a href="#context">context</a></td></tr>
</tbody>
</table>
<p>In Python, for example, this language-native function, which adds an exclamation point to all incoming strings and publishes the resulting string to a topic, would have no external dependencies:</p>
<pre><code class="hljs css language-python"><span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(input)</span>:</span>
<span class="hljs-keyword">return</span> <span class="hljs-string">"{}!"</span>.format(input)
</code></pre>
<p>This function, however, would use the Pulsar Functions <a href="#python-sdk-functions">SDK for Python</a>:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function
<span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">DisplayFunctionName</span><span class="hljs-params">(Function)</span>:</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span>
function_name = context.function_name()
<span class="hljs-keyword">return</span> <span class="hljs-string">"The function processing this message has the name {0}"</span>.format(function_name)
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="serialization-and-deserialization-serde"></a><a href="#serialization-and-deserialization-serde" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Serialization and deserialization (SerDe)</h3>
<p>SerDe stands for <strong>Ser</strong>ialization and <strong>De</strong>serialization. All Pulsar Functions use SerDe for message handling. How SerDe works by default depends on the language you're using for a particular function:</p>
<ul>
<li>In <a href="#python-serde">Python</a>, the default SerDe is identity, meaning that the type is serialized as whatever type the producer function returns</li>
<li>In <a href="#java-serde">Java</a>, a number of commonly used types (<code>String</code>s, <code>Integer</code>s, etc.) are supported by default</li>
</ul>
<p>In both languages, however, you can write your own custom SerDe logic for more complex, application-specific types. See the docs for <a href="#java-serde">Java</a> and <a href="#python-serde">Python</a> for language-specific instructions.</p>
<h3><a class="anchor" aria-hidden="true" id="context"></a><a href="#context" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Context</h3>
<p>Both the <a href="#java-sdk-functions">Java</a> and <a href="#python-sdk-functions">Python</a> SDKs provide access to a <strong>context object</strong> that can be used by the function. This context object provides a wide variety of information and functionality to the function:</p>
<ul>
<li>The name and ID of the Pulsar Function</li>
<li>The message ID of each message. Each Pulsar message is automatically assigned an ID.</li>
<li>The name of the topic on which the message was sent</li>
<li>The names of all input topics as well as the output topic associated with the function</li>
<li>The name of the class used for <a href="#serialization-and-deserialization-serde">SerDe</a></li>
<li>The <a href="/docs/en/2.1.0-incubating/reference-terminology#tenant">tenant</a> and namespace associated with the function</li>
<li>The ID of the Pulsar Functions instance running the function</li>
<li>The version of the function</li>
<li>The <a href="/docs/en/2.1.0-incubating/functions-overview#logging">logger object</a> used by the function, which can be used to create function log messages</li>
<li>Access to arbitrary <a href="#user-config">user config</a> values supplied via the CLI</li>
<li>An interface for recording <a href="/docs/en/2.1.0-incubating/functions-metrics">metrics</a></li>
<li>An interface for storing and retrieving state in <a href="/docs/en/2.1.0-incubating/functions-overview#state-storage">state storage</a></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="user-config"></a><a href="#user-config" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>User config</h3>
<p>When you run or update Pulsar Functions created using the <a href="#available-apis">SDK</a>, you can pass arbitrary key/values to them via the command line with the <code>--userConfig</code> flag. Key/values must be specified as JSON. Here's an example of a function creation command that passes a user config key/value to a function:</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \
--name word-filter \
<span class="hljs-comment"># Other function configs</span>
--userConfig <span class="hljs-string">'{"forbidden-word":"rosebud"}'</span>
</code></pre>
<p>If the function were a Python function, that config value could be accessed like this:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function
<span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">WordFilter</span><span class="hljs-params">(Function)</span>:</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, context, input)</span>:</span>
forbidden_word = context.user_config()[<span class="hljs-string">"forbidden-word"</span>]
<span class="hljs-comment"># Don't publish the message if it contains the user-supplied</span>
<span class="hljs-comment"># forbidden word</span>
<span class="hljs-keyword">if</span> forbidden_word <span class="hljs-keyword">in</span> input:
<span class="hljs-keyword">pass</span>
<span class="hljs-comment"># Otherwise publish the message</span>
<span class="hljs-keyword">else</span>:
<span class="hljs-keyword">return</span> input
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="functions-for-java"></a><a href="#functions-for-java" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Functions for Java</h2>
<p>Writing Pulsar Functions in Java involves implementing one of two interfaces:</p>
<ul>
<li>The <a href="https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html"><code>java.util.Function</code></a> interface</li>
<li>The <a href="https://pulsar.apache.org/api/pulsar-functions/2.1.0-incubating/org/apache/pulsar/functions/api/Function">Function</a>
interface. This interface works much like the <code>java.util.Function</code> interface, but with the important difference that it provides a <a href="https://pulsar.apache.org/api/pulsar-functions/2.1.0-incubating/org/apache/pulsar/functions/api/Context">Context</a>
object that you can use in a <a href="#context">variety of ways</a></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="getting-started"></a><a href="#getting-started" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Getting started</h3>
<p>In order to write Pulsar Functions in Java, you'll need to install the proper <a href="#dependencies">dependencies</a> and package your function <a href="#packaging">as a JAR</a>.</p>
<h4><a class="anchor" aria-hidden="true" id="dependencies"></a><a href="#dependencies" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Dependencies</h4>
<p>How you get started writing Pulsar Functions in Java depends on which API you're using:</p>
<ul>
<li><p>If you're writing a <a href="#java-native-functions">Java native function</a>, you won't need any external dependencies.</p></li>
<li><p>If you're writing a <a href="#java-sdk-functions">Java SDK function</a>, you'll need to import the <code>pulsar-functions-api</code> library.</p>
<p>Here's an example for a Maven <code>pom.xml</code> configuration file:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.pulsar<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>pulsar-functions-api<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>2.0.0-incubating-SNAPSHOT<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>
</code></pre>
<p>Here's an example for a Gradle <code>build.gradle</code> configuration file:</p>
<pre><code class="hljs css language-groovy">dependencies {
compile <span class="hljs-string">group:</span> <span class="hljs-string">'org.apache.pulsar'</span>, <span class="hljs-string">name:</span> <span class="hljs-string">'pulsar-functions-api'</span>, <span class="hljs-string">version:</span> <span class="hljs-string">'2.0.0-incubating-SNAPSHOT'</span>
}
</code></pre></li>
</ul>
<h4><a class="anchor" aria-hidden="true" id="packaging"></a><a href="#packaging" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Packaging</h4>
<p>Whether you're writing Java Pulsar Functions using the <a href="#java-native-functions">native</a> Java <code>java.util.Function</code> interface or using the <a href="#java-sdk-functions">Java SDK</a>, you'll need to package your function(s) as a &quot;fat&quot; JAR.</p>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="starter-repo"></a><a href="#starter-repo" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Starter repo</h4>
<p>If you'd like to get up and running quickly, you can use <a href="https://github.com/streamlio/pulsar-functions-java-starter">this repo</a>, which contains the necessary Maven configuration to build a fat JAR as well as some example functions.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="java-native-functions"></a><a href="#java-native-functions" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java native functions</h3>
<p>If your function doesn't require access to its <a href="#context">context</a>, you can create a Pulsar Function by implementing the <a href="https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html"><code>java.util.Function</code></a> interface, which has this very simple, single-method signature:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">interface</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">I</span>, <span class="hljs-title">O</span>&gt; </span>{
<span class="hljs-function">O <span class="hljs-title">apply</span><span class="hljs-params">(I input)</span></span>;
}
</code></pre>
<p>Here's an example function that takes a string as its input, adds an exclamation point to the end of the string, and then publishes the resulting string:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> java.util.Function;
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ExclamationFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">String</span>&gt; </span>{
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> String <span class="hljs-title">process</span><span class="hljs-params">(String input)</span> </span>{
<span class="hljs-keyword">return</span> String.format(<span class="hljs-string">"%s!"</span>, input);
}
}
</code></pre>
<p>In general, you should use native functions when you don't need access to the function's <a href="#context">context</a>. If you <em>do</em> need access to the function's context, then we recommend using the <a href="#java-sdk-functions">Pulsar Functions Java SDK</a>.</p>
<h4><a class="anchor" aria-hidden="true" id="java-native-examples"></a><a href="#java-native-examples" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java native examples</h4>
<p>There is one example Java native function in this <a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples">folder</a>
:</p>
<ul>
<li><a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeExclamationFunction.java"><code>JavaNativeExclamationFunction</code></a>
</li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="java-sdk-functions"></a><a href="#java-sdk-functions" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java SDK functions</h3>
<p>To get started developing Pulsar Functions using the Java SDK, you'll need to add a dependency on the <code>pulsar-functions-api</code> artifact to your project. Instructions can be found <a href="#dependencies">above</a>.</p>
<blockquote>
<p>An easy way to get up and running with Pulsar Functions in Java is to clone the <a href="https://github.com/streamlio/pulsar-functions-java-starter"><code>pulsar-functions-java-starter</code></a> repo and follow the instructions there.</p>
</blockquote>
<h4><a class="anchor" aria-hidden="true" id="java-sdk-examples"></a><a href="#java-sdk-examples" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java SDK examples</h4>
<p>There are several example Java SDK functions in this <a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples">folder</a>
:</p>
<table>
<thead>
<tr><th style="text-align:left">Function name</th><th style="text-align:left">Description</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextFunction.java"><code>ContextFunction</code></a></td><td style="text-align:left">Illustrates <a href="#context">context</a>-specific functionality like <a href="#java-logging">logging</a> and <a href="#java-metrics">metrics</a></td></tr>
<tr><td style="text-align:left"><a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java"><code>WordCountFunction</code></a></td><td style="text-align:left">Illustrates usage of Pulsar Function <a href="/docs/en/2.1.0-incubating/functions-overview#state-storage">state-storage</a></td></tr>
<tr><td style="text-align:left"><a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java"><code>ExclamationFunction</code></a></td><td style="text-align:left">A basic string manipulation function for the Java SDK</td></tr>
<tr><td style="text-align:left"><a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java"><code>LoggingFunction</code></a></td><td style="text-align:left">A function that shows how <a href="#java-logging">logging</a> works for Java</td></tr>
<tr><td style="text-align:left"><a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java"><code>PublishFunction</code></a></td><td style="text-align:left">Publishes results to a topic specified in the function's <a href="#java-user-config">user config</a> (rather than on the function's output topic)</td></tr>
<tr><td style="text-align:left"><a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java"><code>UserConfigFunction</code></a></td><td style="text-align:left">A function that consumes <a href="#java-user-config">user-supplied configuration</a> values</td></tr>
<tr><td style="text-align:left"><a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java"><code>UserMetricFunction</code></a></td><td style="text-align:left">A function that records metrics</td></tr>
<tr><td style="text-align:left"><a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java"><code>VoidFunction</code></a></td><td style="text-align:left">A simple <a href="#void-functions">void function</a></td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="java-context-object"></a><a href="#java-context-object" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java context object</h3>
<p>The <a href="https://pulsar.apache.org/api/client/2.1.0-incubating/org/apache/pulsar/functions/api/Context">Context</a>
interface provides a number of methods that you can use to access the function's <a href="#context">context</a>. The various method signatures for the <code>Context</code> interface are listed below:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">interface</span> <span class="hljs-title">Context</span> </span>{
<span class="hljs-keyword">byte</span>[] getMessageId();
<span class="hljs-function">String <span class="hljs-title">getTopicName</span><span class="hljs-params">()</span></span>;
<span class="hljs-function">Collection&lt;String&gt; <span class="hljs-title">getSourceTopics</span><span class="hljs-params">()</span></span>;
<span class="hljs-function">String <span class="hljs-title">getSinkTopic</span><span class="hljs-params">()</span></span>;
<span class="hljs-function">String <span class="hljs-title">getOutputSerdeClassName</span><span class="hljs-params">()</span></span>;
<span class="hljs-function">String <span class="hljs-title">getTenant</span><span class="hljs-params">()</span></span>;
<span class="hljs-function">String <span class="hljs-title">getNamespace</span><span class="hljs-params">()</span></span>;
<span class="hljs-function">String <span class="hljs-title">getFunctionName</span><span class="hljs-params">()</span></span>;
<span class="hljs-function">String <span class="hljs-title">getFunctionId</span><span class="hljs-params">()</span></span>;
<span class="hljs-function">String <span class="hljs-title">getInstanceId</span><span class="hljs-params">()</span></span>;
<span class="hljs-function">String <span class="hljs-title">getFunctionVersion</span><span class="hljs-params">()</span></span>;
<span class="hljs-function">Logger <span class="hljs-title">getLogger</span><span class="hljs-params">()</span></span>;
<span class="hljs-function">Map&lt;String, String&gt; <span class="hljs-title">getUserConfigMap</span><span class="hljs-params">()</span></span>;
<span class="hljs-function">Optional&lt;String&gt; <span class="hljs-title">getUserConfigValue</span><span class="hljs-params">(String key)</span></span>;
<span class="hljs-function">String <span class="hljs-title">getUserConfigValueOrDefault</span><span class="hljs-params">(String key, String <span class="hljs-keyword">default</span>)</span></span>;
<span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">recordMetric</span><span class="hljs-params">(String metricName, <span class="hljs-keyword">double</span> value)</span></span>;
&lt;O&gt; <span class="hljs-function">CompletableFuture&lt;Void&gt; <span class="hljs-title">publish</span><span class="hljs-params">(String topicName, O object, String serDeClassName)</span></span>;
&lt;O&gt; <span class="hljs-function">CompletableFuture&lt;Void&gt; <span class="hljs-title">publish</span><span class="hljs-params">(String topicName, O object)</span></span>;
<span class="hljs-function">CompletableFuture&lt;Void&gt; <span class="hljs-title">ack</span><span class="hljs-params">(<span class="hljs-keyword">byte</span>[] messageId, String topic)</span></span>;
}
</code></pre>
<p>Here's an example function that uses several methods available via the <code>Context</code> object:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;
<span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;
<span class="hljs-keyword">import</span> org.slf4j.Logger;
<span class="hljs-keyword">import</span> java.util.stream.Collectors;
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ContextFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{
<span class="hljs-function"><span class="hljs-keyword">public</span> Void <span class="hljs-title">process</span><span class="hljs-params">(String input, Context context)</span> </span>{
Logger LOG = context.getLogger();
String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(<span class="hljs-string">", "</span>));
String functionName = context.getFunctionName();
String logMessage = String.format(<span class="hljs-string">"A message with a value of \"%s\" has arrived on one of the following topics: %s\n"</span>,
input,
inputTopics);
LOG.info(logMessage);
String metricName = String.format(<span class="hljs-string">"function-%s-messages-received"</span>, functionName);
context.recordMetric(metricName, <span class="hljs-number">1</span>);
<span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;
}
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="void-functions"></a><a href="#void-functions" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Void functions</h3>
<p>Pulsar Functions can publish results to an output topic, but this isn't required. You can also have functions that simply produce a log, write results to a database, etc. Here's a function that writes a simple log every time a message is received:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.slf4j.Logger;
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">LogFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">PulsarFunction</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{
<span class="hljs-function"><span class="hljs-keyword">public</span> String <span class="hljs-title">apply</span><span class="hljs-params">(String input, Context context)</span> </span>{
Logger LOG = context.getLogger();
LOG.info(<span class="hljs-string">"The following message was received: {}"</span>, input);
<span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;
}
}
</code></pre>
<blockquote>
<p>When using Java functions in which the output type is <code>Void</code>, the function must <em>always</em> return <code>null</code>.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="java-serde"></a><a href="#java-serde" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java SerDe</h3>
<p>Pulsar Functions use <a href="#serialization-and-deserialization-serde">SerDe</a> when publishing data to and consuming data from Pulsar topics. When you're writing Pulsar Functions in Java, the following basic Java types are built in and supported by default:</p>
<ul>
<li><code>String</code></li>
<li><code>Double</code></li>
<li><code>Integer</code></li>
<li><code>Float</code></li>
<li><code>Long</code></li>
<li><code>Short</code></li>
<li><code>Byte</code></li>
</ul>
<p>Built-in vs. custom. For custom, you need to implement this interface:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">interface</span> <span class="hljs-title">SerDe</span>&lt;<span class="hljs-title">T</span>&gt; </span>{
<span class="hljs-function">T <span class="hljs-title">deserialize</span><span class="hljs-params">(<span class="hljs-keyword">byte</span>[] input)</span></span>;
<span class="hljs-keyword">byte</span>[] serialize(T input);
}
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="java-serde-example"></a><a href="#java-serde-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java SerDe example</h4>
<p>Imagine that you're writing Pulsar Functions in Java that are processing tweet objects. Here's a simple example <code>Tweet</code> class:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Tweet</span> </span>{
<span class="hljs-keyword">private</span> String username;
<span class="hljs-keyword">private</span> String tweetContent;
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">Tweet</span><span class="hljs-params">(String username, String tweetContent)</span> </span>{
<span class="hljs-keyword">this</span>.username = username;
<span class="hljs-keyword">this</span>.tweetContent = tweetContent;
}
<span class="hljs-comment">// Standard setters and getters</span>
}
</code></pre>
<p>In order to be able to pass <code>Tweet</code> objects directly between Pulsar Functions, you'll need to provide a custom SerDe class. In the example below, <code>Tweet</code> objects are basically strings in which the username and tweet content are separated by a <code>|</code>.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">package</span> com.example.serde;
<span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.SerDe;
<span class="hljs-keyword">import</span> java.util.regex.Pattern;
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">TweetSerde</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">SerDe</span>&lt;<span class="hljs-title">Tweet</span>&gt; </span>{
<span class="hljs-function"><span class="hljs-keyword">public</span> Tweet <span class="hljs-title">deserialize</span><span class="hljs-params">(<span class="hljs-keyword">byte</span>[] input)</span> </span>{
String s = <span class="hljs-keyword">new</span> String(input);
String[] fields = s.split(Pattern.quote(<span class="hljs-string">"|"</span>));
<span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> Tweet(fields[<span class="hljs-number">0</span>], fields[<span class="hljs-number">1</span>]);
}
<span class="hljs-keyword">public</span> <span class="hljs-keyword">byte</span>[] serialize(Tweet input) {
<span class="hljs-keyword">return</span> <span class="hljs-string">"%s|%s"</span>.format(input.getUsername(), input.getTweetContent()).getBytes();
}
}
</code></pre>
<p>To apply this custom SerDe to a particular Pulsar Function, you would need to:</p>
<ul>
<li>Package the <code>Tweet</code> and <code>TweetSerde</code> classes into a JAR</li>
<li>Specify a path to the JAR and SerDe class name when deploying the function</li>
</ul>
<p>Here's an example <a href="/docs/en/2.1.0-incubating/reference-pulsar-admin#create-1"><code>create</code></a> operation:</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \
--jar /path/to/your.jar \
--outputSerdeClassName com.example.serde.TweetSerde \
<span class="hljs-comment"># Other function attributes</span>
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="custom-serde-classes-must-be-packaged-with-your-function-jars"></a><a href="#custom-serde-classes-must-be-packaged-with-your-function-jars" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Custom SerDe classes must be packaged with your function JARs</h4>
<p>Pulsar does not store your custom SerDe classes separately from your Pulsar Functions. That means that you'll need to always include your SerDe classes in your function JARs. If not, Pulsar will return an error.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="java-logging"></a><a href="#java-logging" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java logging</h3>
<p>Pulsar Functions that use the <a href="#java-sdk-functions">Java SDK</a> have access to an <a href="https://www.slf4j.org/">SLF4j</a> <a href="https://www.slf4j.org/api/org/apache/log4j/Logger.html"><code>Logger</code></a> object that can be used to produce logs at the chosen log level. Here's a simple example function that logs either a <code>WARNING</code>- or <code>INFO</code>-level log based on whether the incoming string contains the word <code>danger</code>:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;
<span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;
<span class="hljs-keyword">import</span> org.slf4j.Logger;
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">LoggingFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">apply</span><span class="hljs-params">(String input, Context context)</span> </span>{
Logger LOG = context.getLogger();
String messageId = <span class="hljs-keyword">new</span> String(context.getMessageId());
<span class="hljs-keyword">if</span> (input.contains(<span class="hljs-string">"danger"</span>)) {
LOG.warn(<span class="hljs-string">"A warning was received in message {}"</span>, messageId);
} <span class="hljs-keyword">else</span> {
LOG.info(<span class="hljs-string">"Message {} received\nContent: {}"</span>, messageId, input);
}
<span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;
}
}
</code></pre>
<p>If you want your function to produce logs, you need to specify a log topic when creating or running the function. Here's an example:</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \
--jar my-functions.jar \
--className my.package.LoggingFunction \
--logTopic persistent://public/default/logging-function-logs \
<span class="hljs-comment"># Other function configs</span>
</code></pre>
<p>Now, all logs produced by the <code>LoggingFunction</code> above can be accessed via the <code>persistent://public/default/logging-function-logs</code> topic.</p>
<h3><a class="anchor" aria-hidden="true" id="java-user-config"></a><a href="#java-user-config" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java user config</h3>
<p>The Java SDK's <a href="#context"><code>Context</code></a> object enables you to access key/value pairs provided to the Pulsar Function via the command line (as JSON). Here's an example function creation command that passes a key/value pair:</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \
<span class="hljs-comment"># Other function configs</span>
--userConfig <span class="hljs-string">'{"word-of-the-day":"verdure"}'</span>
</code></pre>
<p>To access that value in a Java function:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;
<span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;
<span class="hljs-keyword">import</span> org.slf4j.Logger;
<span class="hljs-keyword">import</span> java.util.Optional;
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">UserConfigFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">apply</span><span class="hljs-params">(String input, Context context)</span> </span>{
Logger LOG = context.getLogger();
Optional&lt;String&gt; wotd = context.getUserConfigValue(<span class="hljs-string">"word-of-the-day"</span>);
<span class="hljs-keyword">if</span> (wotd.isPresent()) {
LOG.info(<span class="hljs-string">"The word of the day is {}"</span>, wotd);
} <span class="hljs-keyword">else</span> {
LOG.warn(<span class="hljs-string">"No word of the day provided"</span>);
}
<span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;
}
}
</code></pre>
<p>The <code>UserConfigFunction</code> function will log the string <code>&quot;The word of the day is verdure&quot;</code> every time the function is invoked (i.e. every time a message arrives). The <code>word-of-the-day</code> user config will be changed only when the function is updated with a new config value via the command line.</p>
<p>You can also access the entire user config map or set a default value in case no value is present:</p>
<pre><code class="hljs css language-java"><span class="hljs-comment">// Get the whole config map</span>
Map&lt;String, String&gt; allConfigs = context.getUserConfigMap();
<span class="hljs-comment">// Get value or resort to default</span>
String wotd = context.getUserConfigValueOrDefault(<span class="hljs-string">"word-of-the-day"</span>, <span class="hljs-string">"perspicacious"</span>);
</code></pre>
<blockquote>
<p>For all key/value pairs passed to Java Pulsar Functions, both the key <em>and</em> the value are <code>String</code>s. If you'd like the value to be of a different type, you will need to deserialize from the <code>String</code> type.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="java-metrics"></a><a href="#java-metrics" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Java metrics</h3>
<p>You can record metrics using the <a href="#context"><code>Context</code></a> object on a per-key basis. You can, for example, set a metric for the key <code>process-count</code> and a different metric for the key <code>elevens-count</code> every time the function processes a message. Here's an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;
<span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MetricRecorderFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">Integer</span>, <span class="hljs-title">Void</span>&gt; </span>{
<span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">apply</span><span class="hljs-params">(Integer input, Context context)</span> </span>{
<span class="hljs-comment">// Records the metric 1 every time a message arrives</span>
context.recordMetric(<span class="hljs-string">"hit-count"</span>, <span class="hljs-number">1</span>);
<span class="hljs-comment">// Records the metric only if the arriving number equals 11</span>
<span class="hljs-keyword">if</span> (input == <span class="hljs-number">11</span>) {
context.recordMetric(<span class="hljs-string">"elevens-count"</span>, <span class="hljs-number">1</span>);
}
<span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;
}
}
</code></pre>
<blockquote>
<p>For instructions on reading and using metrics, see the <a href="/docs/en/2.1.0-incubating/deploy-monitoring">Monitoring</a> guide.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="functions-for-python"></a><a href="#functions-for-python" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Functions for Python</h2>
<p>Writing Pulsar Functions in Python entails implementing one of two things:</p>
<ul>
<li>A <code>process</code> function that takes an input (message data from the function's input topic(s)), applies some kind of logic to it, and either returns an object (to be published to the function's output topic) or <code>pass</code>es and thus doesn't produce a message</li>
<li>A <code>Function</code> class that has a <code>process</code> method that provides a message input to process and a <a href="#context">context</a> object</li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="getting-started-1"></a><a href="#getting-started-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Getting started</h3>
<p>Regardless of which <a href="/docs/en/2.1.0-incubating/functions-deploying">deployment mode</a> you're using, you'll need to install the following Python libraries on any machine that's running Pulsar Functions written in Python:</p>
<ul>
<li>pulsar-client</li>
<li>protobuf</li>
<li>futures</li>
<li>grpcio</li>
<li>grpcio-tools</li>
</ul>
<p>That could be your local machine for <a href="/docs/en/2.1.0-incubating/functions-deploying#local-run-mode">local run mode</a> or a machine running a Pulsar <a href="/docs/en/2.1.0-incubating/reference-terminology#broker">broker</a> for <a href="/docs/en/2.1.0-incubating/functions-deploying#cluster-mode">cluster mode</a>. To install those libraries using pip:</p>
<pre><code class="hljs css language-bash">$ pip install pulsar-client protobuf futures grpcio grpcio-tools
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="packaging-1"></a><a href="#packaging-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Packaging</h3>
<p>At the moment, the code for Pulsar Functions written in Python must be contained within a single Python file. In the future, Pulsar Functions may support other packaging formats, such as <a href="https://github.com/pantsbuild/pex"><strong>P</strong>ython <strong>EX</strong>ecutables</a> (PEXes).</p>
<h3><a class="anchor" aria-hidden="true" id="python-native-functions"></a><a href="#python-native-functions" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python native functions</h3>
<p>If your function doesn't require access to its <a href="#context">context</a>, you can create a Pulsar Function by implementing a <code>process</code> function, which provides a single input object that you can process however you wish. Here's an example function that takes a string as its input, adds an exclamation point at the end of the string, and then publishes the resulting string:</p>
<pre><code class="hljs css language-python"><span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(input)</span>:</span>
<span class="hljs-keyword">return</span> <span class="hljs-string">"{0}!"</span>.format(input)
</code></pre>
<p>In general, you should use native functions when you don't need access to the function's <a href="#context">context</a>. If you <em>do</em> need access to the function's context, then we recommend using the <a href="#python-sdk-functions">Pulsar Functions Python SDK</a>.</p>
<h4><a class="anchor" aria-hidden="true" id="python-native-examples"></a><a href="#python-native-examples" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python native examples</h4>
<p>There is one example Python native function in this <a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/python-examples">folder</a>
:</p>
<ul>
<li><a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/python-examples/native_exclamation_function.py"><code>native_exclamation_function.py</code></a>
</li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="python-sdk-functions"></a><a href="#python-sdk-functions" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python SDK functions</h3>
<p>To get started developing Pulsar Functions using the Python SDK, you'll need to install the <a href="/api/python/2.1.0-incubating"><code>pulsar-client</code></a> library using the instructions <a href="#getting-started">above</a>.</p>
<h4><a class="anchor" aria-hidden="true" id="python-sdk-examples"></a><a href="#python-sdk-examples" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python SDK examples</h4>
<p>There are several example Python functions in this <a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/python-examples">folder</a>
:</p>
<table>
<thead>
<tr><th style="text-align:left">Function file</th><th style="text-align:left">Description</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/python-examples/exclamation_function.py"><code>exclamation_function.py</code></a></td><td style="text-align:left">Adds an exclamation point at the end of each incoming string</td></tr>
<tr><td style="text-align:left"><a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/python-examples/logging_function.py"><code>logging_function.py</code></a></td><td style="text-align:left">Logs each incoming message</td></tr>
<tr><td style="text-align:left"><a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/python-examples/thumbnailer.py"><code>thumbnailer.py</code></a></td><td style="text-align:left">Takes image data as input and outputs a 128x128 thumbnail of each image</td></tr>
</tbody>
</table>
<h4><a class="anchor" aria-hidden="true" id="python-context-object"></a><a href="#python-context-object" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python context object</h4>
<p>The <a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/context.py"><code>Context</code></a> class provides a number of methods that you can use to access the function's <a href="#context">context</a>. The various methods for the <code>Context</code> class are listed below:</p>
<table>
<thead>
<tr><th style="text-align:left">Method</th><th style="text-align:left">What it provides</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>get_message_id</code></td><td style="text-align:left">The message ID of the message being processed</td></tr>
<tr><td style="text-align:left"><code>get_topic_name</code></td><td style="text-align:left">The input topic of the message being processed</td></tr>
<tr><td style="text-align:left"><code>get_function_name</code></td><td style="text-align:left">The name of the current Pulsar Function</td></tr>
<tr><td style="text-align:left"><code>get_function_id</code></td><td style="text-align:left">The ID of the current Pulsar Function</td></tr>
<tr><td style="text-align:left"><code>get_instance_id</code></td><td style="text-align:left">The ID of the current Pulsar Functions instance</td></tr>
<tr><td style="text-align:left"><code>get_function_version</code></td><td style="text-align:left">The version of the current Pulsar Function</td></tr>
<tr><td style="text-align:left"><code>get_logger</code></td><td style="text-align:left">A logger object that can be used for <a href="#python-logging">logging</a></td></tr>
<tr><td style="text-align:left"><code>get_user_config_value</code></td><td style="text-align:left">Returns the value of a <a href="#python-user-config">user-defined config</a> (or <code>None</code> if the config doesn't exist)</td></tr>
<tr><td style="text-align:left"><code>get_user_config_map</code></td><td style="text-align:left">Returns the entire user-defined config as a dict</td></tr>
<tr><td style="text-align:left"><code>record_metric</code></td><td style="text-align:left">Records a per-key <a href="#python-metrics">metric</a></td></tr>
<tr><td style="text-align:left"><code>publish</code></td><td style="text-align:left">Publishes a message to the specified Pulsar topic</td></tr>
<tr><td style="text-align:left"><code>get_output_serde_class_name</code></td><td style="text-align:left">The name of the output <a href="#python-serde">SerDe</a> class</td></tr>
<tr><td style="text-align:left"><code>ack</code></td><td style="text-align:left"><a href="/docs/en/2.1.0-incubating/reference-terminology#acknowledgment-ack">Acks</a> the message being processed to Pulsar</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="python-serde"></a><a href="#python-serde" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python SerDe</h3>
<p>Pulsar Functions use <a href="#serialization-and-deserialization-serde">SerDe</a> when publishing data to and consuming data from Pulsar topics (this is true of both <a href="#python-native-functions">native</a> functions and <a href="#python-sdk-functions">SDK</a> functions). You can specify the SerDe when <a href="/docs/en/2.1.0-incubating/functions-deploying#cluster-mode">creating</a> or <a href="/docs/en/2.1.0-incubating/functions-deploying#local-run-mode">running</a> functions. Here's an example:</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \
--tenant public \
--namespace default \
--name my_function \
--py my_function.py \
--className my_function.MyFunction \
--customSerdeInputs <span class="hljs-string">'{"input-topic-1":"Serde1","input-topic-2":"Serde2"}'</span> \
--outputSerdeClassName Serde3 \
--output output-topic-1
</code></pre>
<p>In this case, there are two input topics, <code>input-topic-1</code> and <code>input-topic-2</code>, each of which is mapped to a different SerDe class (the map must be specified as a JSON string). The output topic, <code>output-topic-1</code>, uses the <code>Serde3</code> class for SerDe. At the moment, all Pulsar Function logic, include processing function and SerDe classes, must be contained within a single Python file.</p>
<p>When using Pulsar Functions for Python, you essentially have three SerDe options:</p>
<ol>
<li>You can use the <a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L70"><code>IdentitySerde</code></a>, which leaves the data unchanged. The <code>IdentitySerDe</code> is the <strong>default</strong>. Creating or running a function without explicitly specifying SerDe will mean that this option is used.</li>
<li>You can use the <a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L62"><code>PickeSerDe</code></a>, which uses Python's <a href="https://docs.python.org/3/library/pickle.html"><code>pickle</code></a> for SerDe.</li>
<li>You can create a custom SerDe class by implementing the baseline <a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L50"><code>SerDe</code></a> class, which has just two methods: <a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L53"><code>serialize</code></a> for converting the object into bytes, and <a href="https://github.com/apache/incubator-pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L58"><code>deserialize</code></a> for converting bytes into an object of the required application-specific type.</li>
</ol>
<p>The table below shows when you should use each SerDe:</p>
<table>
<thead>
<tr><th style="text-align:left">SerDe option</th><th style="text-align:left">When to use</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>IdentitySerde</code></td><td style="text-align:left">When you're working with simple types like strings, Booleans, integers, and the like</td></tr>
<tr><td style="text-align:left"><code>PickleSerDe</code></td><td style="text-align:left">When you're working with complex, application-specific types and are comfortable with <code>pickle</code>'s &quot;best effort&quot; approach</td></tr>
<tr><td style="text-align:left">Custom SerDe</td><td style="text-align:left">When you require explicit control over SerDe, potentially for performance or data compatibility purposes</td></tr>
</tbody>
</table>
<h4><a class="anchor" aria-hidden="true" id="python-serde-example"></a><a href="#python-serde-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python SerDe example</h4>
<p>Imagine that you're writing Pulsar Functions in Python that are processing tweet objects. Here's a simple <code>Tweet</code> class:</p>
<pre><code class="hljs css language-python"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Tweet</span><span class="hljs-params">(object)</span>:</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">__init__</span><span class="hljs-params">(self, username, tweet_content)</span>:</span>
self.username = username
self.tweet_content = tweet_content
</code></pre>
<p>In order to use this class in Pulsar Functions, you'd have two options:</p>
<ol>
<li>You could specify <code>PickleSerDe</code>, which would apply the <a href="https://docs.python.org/3/library/pickle.html"><code>pickle</code></a> library's SerDe</li>
<li>You could create your own SerDe class. Here's a simple example:</li>
</ol>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> SerDe
<span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">TweetSerDe</span><span class="hljs-params">(SerDe)</span>:</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">__init__</span><span class="hljs-params">(self, tweet)</span>:</span>
self.tweet = tweet
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">serialize</span><span class="hljs-params">(self, input)</span>:</span>
<span class="hljs-keyword">return</span> bytes(<span class="hljs-string">"{0}|{1}"</span>.format(self.tweet.username, self.tweet.tweet_content))
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">deserialize</span><span class="hljs-params">(self, input_bytes)</span>:</span>
tweet_components = str(input_bytes).split(<span class="hljs-string">'|'</span>)
<span class="hljs-keyword">return</span> Tweet(tweet_components[<span class="hljs-number">0</span>], tweet_componentsp[<span class="hljs-number">1</span>])
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="python-logging"></a><a href="#python-logging" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python logging</h3>
<p>Pulsar Functions that use the <a href="#python-sdk-functions">Python SDK</a> have access to a logging object that can be used to produce logs at the chosen log level. Here's a simple example function that logs either a <code>WARNING</code>- or <code>INFO</code>-level log based on whether the incoming string contains the word <code>danger</code>:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function
<span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">LoggingFunction</span><span class="hljs-params">(Function)</span>:</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span>
logger = context.get_logger()
msg_id = context.get_message_id()
<span class="hljs-keyword">if</span> <span class="hljs-string">'danger'</span> <span class="hljs-keyword">in</span> input:
logger.warn(<span class="hljs-string">"A warning was received in message {0}"</span>.format(context.get_message_id()))
<span class="hljs-keyword">else</span>:
logger.info(<span class="hljs-string">"Message {0} received\nContent: {1}"</span>.format(msg_id, input))
</code></pre>
<p>If you want your function to produce logs on a Pulsar topic, you need to specify a <strong>log topic</strong> when creating or running the function. Here's an example:</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \
--py logging_function.py \
--className logging_function.LoggingFunction \
--logTopic logging-function-logs \
<span class="hljs-comment"># Other function configs</span>
</code></pre>
<p>Now, all logs produced by the <code>LoggingFunction</code> above can be accessed via the <code>logging-function-logs</code> topic.</p>
<h3><a class="anchor" aria-hidden="true" id="python-user-config"></a><a href="#python-user-config" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python user config</h3>
<p>The Python SDK's <a href="#context"><code>Context</code></a> object enables you to access key/value pairs provided to the Pulsar Function via the command line (as JSON). Here's an example function creation command that passes a key/value pair:</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \
<span class="hljs-comment"># Other function configs \</span>
--userConfig <span class="hljs-string">'{"word-of-the-day":"verdure"}'</span>
</code></pre>
<p>To access that value in a Python function:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function
<span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">UserConfigFunction</span><span class="hljs-params">(Function)</span>:</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span>
logger = context.get_logger()
wotd = context.get_user_config_value(<span class="hljs-string">'word-of-the-day'</span>)
<span class="hljs-keyword">if</span> wotd <span class="hljs-keyword">is</span> <span class="hljs-literal">None</span>:
logger.warn(<span class="hljs-string">'No word of the day provided'</span>)
<span class="hljs-keyword">else</span>:
logger.info(<span class="hljs-string">"The word of the day is {0}"</span>.format(wotd))
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="python-metrics"></a><a href="#python-metrics" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Python metrics</h3>
<p>You can record metrics using the <a href="#context"><code>Context</code></a> object on a per-key basis. You can, for example, set a metric for the key <code>process-count</code> and a different metric for the key <code>elevens-count</code> every time the function processes a message. Here's an example:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function
<span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MetricRecorderFunction</span><span class="hljs-params">(Function)</span>:</span>
<span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span>
context.record_metric(<span class="hljs-string">'hit-count'</span>, <span class="hljs-number">1</span>)
<span class="hljs-keyword">if</span> input == <span class="hljs-number">11</span>:
context.record_metric(<span class="hljs-string">'elevens-count'</span>, <span class="hljs-number">1</span>)
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.1.0-incubating/functions-quickstart"><span class="arrow-prev"></span><span>functions-quickstart</span></a><a class="docs-next button" href="/docs/en/2.1.0-incubating/functions-deploying"><span>functions-deploying</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#core-programming-model">Core programming model</a><ul class="toc-headings"><li><a href="#example-function">Example function</a></li><li><a href="#example-deployment">Example deployment</a></li><li><a href="#available-apis">Available APIs</a></li><li><a href="#serialization-and-deserialization-serde">Serialization and deserialization (SerDe)</a></li><li><a href="#context">Context</a></li><li><a href="#user-config">User config</a></li></ul></li><li><a href="#functions-for-java">Functions for Java</a><ul class="toc-headings"><li><a href="#getting-started">Getting started</a></li><li><a href="#java-native-functions">Java native functions</a></li><li><a href="#java-sdk-functions">Java SDK functions</a></li><li><a href="#java-context-object">Java context object</a></li><li><a href="#void-functions">Void functions</a></li><li><a href="#java-serde">Java SerDe</a></li><li><a href="#java-logging">Java logging</a></li><li><a href="#java-user-config">Java user config</a></li><li><a href="#java-metrics">Java metrics</a></li></ul></li><li><a href="#functions-for-python">Functions for Python</a><ul class="toc-headings"><li><a href="#getting-started-1">Getting started</a></li><li><a href="#packaging-1">Packaging</a></li><li><a href="#python-native-functions">Python native functions</a></li><li><a href="#python-sdk-functions">Python SDK functions</a></li><li><a href="#python-serde">Python SerDe</a></li><li><a href="#python-logging">Python logging</a></li><li><a href="#python-user-config">Python user config</a></li><li><a href="#python-metrics">Python metrics</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>