blob: ade0d40f9aa760d9232750e802fa9f1dda59961b [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Develop Pulsar Functions · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="You learn how to develop Pulsar Functions with different APIs for Java, Python and Go."/><meta name="docsearch:version" content="2.9.2"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Develop Pulsar Functions · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="You learn how to develop Pulsar Functions with different APIs for Java, Python and Go."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.9.2</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.9.2/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.9.2/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.9.2/functions-develop">日本語</a></li><li><a href="/docs/fr/2.9.2/functions-develop">Français</a></li><li><a href="/docs/ko/2.9.2/functions-develop">한국어</a></li><li><a href="/docs/zh-CN/2.9.2/functions-develop">中文</a></li><li><a href="/docs/zh-TW/2.9.2/functions-develop">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Pulsar Functions</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/getting-started-helm">Run Pulsar in Kubernetes</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/concepts-proxy-sni-routing">Proxy support with SNI routing</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/concepts-multiple-advertised-listeners">Multiple advertised listeners</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.9.2/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/functions-package">How-to: Package</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/io-connectors">Built-in connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/io-cdc">CDC connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/io-develop">Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/io-cli">CLI</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/sql-getting-started">Query data</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/sql-deployment-configurations">Configuration and deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/sql-rest-api">REST APIs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Tiered Storage</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/tiered-storage-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/tiered-storage-aws">AWS S3 offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/tiered-storage-gcs">GCS offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/tiered-storage-filesystem">Filesystem offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/tiered-storage-azure">Azure BlobStore offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/tiered-storage-aliyun">Aliyun OSS offloader</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Transactions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/txn-why">Why transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/txn-what">What are transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/txn-how">How transactions work?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/txn-use">How to use transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/txn-monitor">How to monitor transactions?</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Kubernetes (Helm)</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/helm-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/helm-prepare">Prepare</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/helm-install">Install</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/helm-deploy">Deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/helm-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/helm-tools">Required Tools</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/deploy-docker">Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/deploy-monitoring">Monitor</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/administration-pulsar-manager">Pulsar Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/administration-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/administration-isolation">Pulsar isolation</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/security-tls-keystore">Using TLS with KeyStore configure</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/security-oauth2">Authentication using OAuth 2.0 access tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/security-extending">Extending</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/security-bouncy-castle">Bouncy Castle Providers</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Performance</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/performance-pulsar-perf">Pulsar Perf</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/client-libraries">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/client-libraries-node">Node.js</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/client-libraries-websocket">WebSocket</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/client-libraries-dotnet">C#</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/admin-api-topics">Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/admin-api-functions">Functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/admin-api-packages">Packages</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/develop-load-manager">Modular load manager</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.9.2/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/functions-develop.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Develop Pulsar Functions</h1></header><article><div><span><p>You learn how to develop Pulsar Functions with different APIs for Java, Python and Go.</p>
<h2><a class="anchor" aria-hidden="true" id="available-apis"></a><a href="#available-apis" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Available APIs</h2>
<p>In Java and Python, you have two options to write Pulsar Functions. In Go, you can use Pulsar Functions SDK for Go.</p>
<table>
<thead>
<tr><th style="text-align:left">Interface</th><th style="text-align:left">Description</th><th style="text-align:left">Use cases</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">Language-native interface</td><td style="text-align:left">No Pulsar-specific libraries or special dependencies required (only core libraries from Java/Python).</td><td style="text-align:left">Functions that do not require access to the function <a href="#context">context</a>.</td></tr>
<tr><td style="text-align:left">Pulsar Function SDK for Java/Python/Go</td><td style="text-align:left">Pulsar-specific libraries that provide a range of functionality not provided by &quot;native&quot; interfaces.</td><td style="text-align:left">Functions that require access to the function <a href="#context">context</a>.</td></tr>
</tbody>
</table>
<p>The language-native function, which adds an exclamation point to all incoming strings and publishes the resulting string to a topic, has no external dependencies. The following example is language-native function.</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-2027-tab-2028" class="nav-link active" data-group="group_2027" data-tab="tab-group-2027-content-2028">Java</div><div id="tab-group-2027-tab-2029" class="nav-link" data-group="group_2027" data-tab="tab-group-2027-content-2029">Python</div></div><div class="tab-content"><div id="tab-group-2027-content-2028" class="tab-pane active" data-group="group_2027" tabindex="-1"><div><span><pre><code class="hljs css language-Java"><span class="hljs-keyword">import</span> java.util.function.Function;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">JavaNativeExclamationFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">String</span>&gt; </span>{<br /> <span class="hljs-meta">@Override</span><br /> <span class="hljs-function"><span class="hljs-keyword">public</span> String <span class="hljs-title">apply</span><span class="hljs-params">(String input)</span> </span>{<br /> <span class="hljs-keyword">return</span> String.format(<span class="hljs-string">"%s!"</span>, input);<br /> }<br />}<br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeExclamationFunction.java">here</a>.</p>
</span></div></div><div id="tab-group-2027-content-2029" class="tab-pane" data-group="group_2027" tabindex="-1"><div><span><pre><code class="hljs css language-python"><span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(input)</span>:</span><br /> <span class="hljs-keyword">return</span> <span class="hljs-string">"{}!"</span>.format(input)<br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/native_exclamation_function.py">here</a>.</p>
<blockquote>
<p>Note
You can write Pulsar Functions in python2 or python3. However, Pulsar only looks for <code>python</code> as the interpreter.</p>
<p>If you're running Pulsar Functions on an Ubuntu system that only supports python3, you might fail to
start the functions. In this case, you can create a symlink. Your system will fail if
you subsequently install any other package that depends on Python 2.x. A solution is under development in <a href="https://github.com/apache/pulsar/issues/5518">Issue 5518</a>.</p>
<pre><code class="hljs css language-bash">sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 10<br /></code></pre>
</blockquote>
</span></div></div></div></div>
<p>The following example uses Pulsar Functions SDK.</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-2030-tab-2031" class="nav-link active" data-group="group_2030" data-tab="tab-group-2030-content-2031">Java</div><div id="tab-group-2030-tab-2032" class="nav-link" data-group="group_2030" data-tab="tab-group-2030-content-2032">Python</div><div id="tab-group-2030-tab-2033" class="nav-link" data-group="group_2030" data-tab="tab-group-2030-content-2033">Go</div></div><div class="tab-content"><div id="tab-group-2030-content-2031" class="tab-pane active" data-group="group_2030" tabindex="-1"><div><span><pre><code class="hljs css language-Java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ExclamationFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">String</span>&gt; </span>{<br /> <span class="hljs-meta">@Override</span><br /> <span class="hljs-function"><span class="hljs-keyword">public</span> String <span class="hljs-title">process</span><span class="hljs-params">(String input, Context context)</span> </span>{<br /> <span class="hljs-keyword">return</span> String.format(<span class="hljs-string">"%s!"</span>, input);<br /> }<br />}<br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java">here</a>.</p>
</span></div></div><div id="tab-group-2030-content-2032" class="tab-pane" data-group="group_2030" tabindex="-1"><div><span><pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ExclamationFunction</span><span class="hljs-params">(Function)</span>:</span><br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">__init__</span><span class="hljs-params">(self)</span>:</span><br /> <span class="hljs-keyword">pass</span><br /><br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span><br /> <span class="hljs-keyword">return</span> input + <span class="hljs-string">'!'</span><br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/exclamation_function.py">here</a>.</p>
</span></div></div><div id="tab-group-2030-content-2033" class="tab-pane" data-group="group_2030" tabindex="-1"><div><span><pre><code class="hljs css language-Go"><span class="hljs-keyword">package</span> main<br /><br /><span class="hljs-keyword">import</span> (<br /> <span class="hljs-string">"context"</span><br /> <span class="hljs-string">"fmt"</span><br /><br /> <span class="hljs-string">"github.com/apache/pulsar/pulsar-function-go/pf"</span><br />)<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">HandleRequest</span><span class="hljs-params">(ctx context.Context, in []<span class="hljs-keyword">byte</span>)</span> <span class="hljs-title">error</span></span>{<br /> fmt.Println(<span class="hljs-keyword">string</span>(in) + <span class="hljs-string">"!"</span>)<br /> <span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span><br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">main</span><span class="hljs-params">()</span></span> {<br /> pf.Start(HandleRequest)<br />}<br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/77cf09eafa4f1626a53a1fe2e65dd25f377c1127/pulsar-function-go/examples/inputFunc/inputFunc.go#L20-L36">here</a>.</p>
</span></div></div></div></div>
<h2><a class="anchor" aria-hidden="true" id="schema-registry"></a><a href="#schema-registry" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Schema registry</h2>
<p>Pulsar has a built-in schema registry and is bundled with popular schema types, such as Avro, JSON and Protobuf. Pulsar Functions can leverage the existing schema information from input topics and derive the input type. The schema registry applies for output topic as well.</p>
<h2><a class="anchor" aria-hidden="true" id="serde"></a><a href="#serde" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>SerDe</h2>
<p>SerDe stands for <strong>Ser</strong>ialization and <strong>De</strong>serialization. Pulsar Functions uses SerDe when publishing data to and consuming data from Pulsar topics. How SerDe works by default depends on the language you use for a particular function.</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-2034-tab-2035" class="nav-link active" data-group="group_2034" data-tab="tab-group-2034-content-2035">Java</div><div id="tab-group-2034-tab-2036" class="nav-link" data-group="group_2034" data-tab="tab-group-2034-content-2036">Python</div><div id="tab-group-2034-tab-2037" class="nav-link" data-group="group_2034" data-tab="tab-group-2034-content-2037">Go</div></div><div class="tab-content"><div id="tab-group-2034-content-2035" class="tab-pane active" data-group="group_2034" tabindex="-1"><div><span><p>When you write Pulsar Functions in Java, the following basic Java types are built in and supported by default: <code>String</code>, <code>Double</code>, <code>Integer</code>, <code>Float</code>, <code>Long</code>, <code>Short</code>, and <code>Byte</code>.</p>
<p>To customize Java types, you need to implement the following interface.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">interface</span> <span class="hljs-title">SerDe</span>&lt;<span class="hljs-title">T</span>&gt; </span>{<br /> <span class="hljs-function">T <span class="hljs-title">deserialize</span><span class="hljs-params">(<span class="hljs-keyword">byte</span>[] input)</span></span>;<br /> <span class="hljs-keyword">byte</span>[] serialize(T input);<br />}<br /></code></pre>
<p>SerDe works in the following ways in Java Functions.</p>
<ul>
<li>If the input and output topics have schema, Pulsar Functions use schema for SerDe.</li>
<li>If the input or output topics do not exist, Pulsar Functions adopt the following rules to determine SerDe:
<ul>
<li>If the schema type is specified, Pulsar Functions use the specified schema type.</li>
<li>If SerDe is specified, Pulsar Functions use the specified SerDe, and the schema type for input and output topics is <code>Byte</code>.</li>
<li>If neither the schema type nor SerDe is specified, Pulsar Functions use the built-in SerDe. For non-primitive schema type, the built-in SerDe serializes and deserializes objects in the <code>JSON</code> format.</li>
</ul></li>
</ul>
</span></div></div><div id="tab-group-2034-content-2036" class="tab-pane" data-group="group_2034" tabindex="-1"><div><span><p>In Python, the default SerDe is identity, meaning that the type is serialized as whatever type the producer function returns.</p>
<p>You can specify the SerDe when <a href="/docs/en/2.9.2/functions-deploy#cluster-mode">creating</a> or <a href="/docs/en/2.9.2/functions-deploy#local-run-mode">running</a> functions.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br /> --tenant public \<br /> --namespace default \<br /> --name my_function \<br /> --py my_function.py \<br /> --classname my_function.MyFunction \<br /> --custom-serde-inputs <span class="hljs-string">'{"input-topic-1":"Serde1","input-topic-2":"Serde2"}'</span> \<br /> --output-serde-classname Serde3 \<br /> --output output-topic-1<br /></code></pre>
<p>This case contains two input topics: <code>input-topic-1</code> and <code>input-topic-2</code>, each of which is mapped to a different SerDe class (the map must be specified as a JSON string). The output topic, <code>output-topic-1</code>, uses the <code>Serde3</code> class for SerDe. At the moment, all Pulsar Functions logic, include processing function and SerDe classes, must be contained within a single Python file.</p>
<p>When using Pulsar Functions for Python, you have three SerDe options:</p>
<ol>
<li>You can use the <a href="https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L70"><code>IdentitySerde</code></a>, which leaves the data unchanged. The <code>IdentitySerDe</code> is the <strong>default</strong>. Creating or running a function without explicitly specifying SerDe means that this option is used.</li>
<li>You can use the <a href="https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L62"><code>PickleSerDe</code></a>, which uses Python <a href="https://docs.python.org/3/library/pickle.html"><code>pickle</code></a> for SerDe.</li>
<li>You can create a custom SerDe class by implementing the baseline <a href="https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L50"><code>SerDe</code></a> class, which has just two methods: <a href="https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L53"><code>serialize</code></a> for converting the object into bytes, and <a href="https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L58"><code>deserialize</code></a> for converting bytes into an object of the required application-specific type.</li>
</ol>
<p>The table below shows when you should use each SerDe.</p>
<table>
<thead>
<tr><th style="text-align:left">SerDe option</th><th style="text-align:left">When to use</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>IdentitySerde</code></td><td style="text-align:left">When you work with simple types like strings, Booleans, integers.</td></tr>
<tr><td style="text-align:left"><code>PickleSerDe</code></td><td style="text-align:left">When you work with complex, application-specific types and are comfortable with the &quot;best effort&quot; approach of <code>pickle</code>.</td></tr>
<tr><td style="text-align:left">Custom SerDe</td><td style="text-align:left">When you require explicit control over SerDe, potentially for performance or data compatibility purposes.</td></tr>
</tbody>
</table>
</span></div></div><div id="tab-group-2034-content-2037" class="tab-pane" data-group="group_2034" tabindex="-1"><div><span><p>Currently, the feature is not available in Go.</p>
</span></div></div></div></div>
<h3><a class="anchor" aria-hidden="true" id="example"></a><a href="#example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example</h3>
<p>Imagine that you're writing Pulsar Functions that are processing tweet objects, you can refer to the following example of <code>Tweet</code> class.</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-2038-tab-2039" class="nav-link active" data-group="group_2038" data-tab="tab-group-2038-content-2039">Java</div><div id="tab-group-2038-tab-2040" class="nav-link" data-group="group_2038" data-tab="tab-group-2038-content-2040">Python</div></div><div class="tab-content"><div id="tab-group-2038-content-2039" class="tab-pane active" data-group="group_2038" tabindex="-1"><div><span><pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Tweet</span> </span>{<br /> <span class="hljs-keyword">private</span> String username;<br /> <span class="hljs-keyword">private</span> String tweetContent;<br /><br /> <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">Tweet</span><span class="hljs-params">(String username, String tweetContent)</span> </span>{<br /> <span class="hljs-keyword">this</span>.username = username;<br /> <span class="hljs-keyword">this</span>.tweetContent = tweetContent;<br /> }<br /><br /> <span class="hljs-comment">// Standard setters and getters</span><br />}<br /></code></pre>
<p>To pass <code>Tweet</code> objects directly between Pulsar Functions, you need to provide a custom SerDe class. In the example below, <code>Tweet</code> objects are basically strings in which the username and tweet content are separated by a <code>|</code>.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">package</span> com.example.serde;<br /><br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.SerDe;<br /><br /><span class="hljs-keyword">import</span> java.util.regex.Pattern;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">TweetSerde</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">SerDe</span>&lt;<span class="hljs-title">Tweet</span>&gt; </span>{<br /> <span class="hljs-function"><span class="hljs-keyword">public</span> Tweet <span class="hljs-title">deserialize</span><span class="hljs-params">(<span class="hljs-keyword">byte</span>[] input)</span> </span>{<br /> String s = <span class="hljs-keyword">new</span> String(input);<br /> String[] fields = s.split(Pattern.quote(<span class="hljs-string">"|"</span>));<br /> <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> Tweet(fields[<span class="hljs-number">0</span>], fields[<span class="hljs-number">1</span>]);<br /> }<br /><br /> <span class="hljs-keyword">public</span> <span class="hljs-keyword">byte</span>[] serialize(Tweet input) {<br /> <span class="hljs-keyword">return</span> <span class="hljs-string">"%s|%s"</span>.format(input.getUsername(), input.getTweetContent()).getBytes();<br /> }<br />}<br /></code></pre>
<p>To apply this customized SerDe to a particular Pulsar Function, you need to:</p>
<ul>
<li>Package the <code>Tweet</code> and <code>TweetSerde</code> classes into a JAR.</li>
<li>Specify a path to the JAR and SerDe class name when deploying the function.</li>
</ul>
<p>The following is an example of <a href="/docs/en/2.9.2/reference-pulsar-admin#create-1"><code>create</code></a> operation.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br /> --jar /path/to/your.jar \<br /> --output-serde-classname com.example.serde.TweetSerde \<br /> <span class="hljs-comment"># Other function attributes</span><br /></code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="custom-serde-classes-must-be-packaged-with-your-function-jars"></a><a href="#custom-serde-classes-must-be-packaged-with-your-function-jars" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Custom SerDe classes must be packaged with your function JARs</h4>
<p>Pulsar does not store your custom SerDe classes separately from your Pulsar Functions. So you need to include your SerDe classes in your function JARs. If not, Pulsar returns an error.</p>
</blockquote>
</span></div></div><div id="tab-group-2038-content-2040" class="tab-pane" data-group="group_2038" tabindex="-1"><div><span><pre><code class="hljs css language-python"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Tweet</span><span class="hljs-params">(object)</span>:</span><br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">__init__</span><span class="hljs-params">(self, username, tweet_content)</span>:</span><br /> self.username = username<br /> self.tweet_content = tweet_content<br /></code></pre>
<p>In order to use this class in Pulsar Functions, you have two options:</p>
<ol>
<li>You can specify <code>PickleSerDe</code>, which applies the <a href="https://docs.python.org/3/library/pickle.html"><code>pickle</code></a> library SerDe.</li>
<li>You can create your own SerDe class. The following is an example.</li>
</ol>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> SerDe<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">TweetSerDe</span><span class="hljs-params">(SerDe)</span>:</span><br /><br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">serialize</span><span class="hljs-params">(self, input)</span>:</span><br /> <span class="hljs-keyword">return</span> bytes(<span class="hljs-string">"{0}|{1}"</span>.format(input.username, input.tweet_content))<br /><br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">deserialize</span><span class="hljs-params">(self, input_bytes)</span>:</span><br /> tweet_components = str(input_bytes).split(<span class="hljs-string">'|'</span>)<br /> <span class="hljs-keyword">return</span> Tweet(tweet_components[<span class="hljs-number">0</span>], tweet_componentsp[<span class="hljs-number">1</span>])<br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/custom_object_function.py">here</a>.</p>
</span></div></div></div></div>
<p>In both languages, however, you can write custom SerDe logic for more complex, application-specific types.</p>
<h2><a class="anchor" aria-hidden="true" id="context"></a><a href="#context" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Context</h2>
<p>Java, Python and Go SDKs provide access to a <strong>context object</strong> that can be used by a function. This context object provides a wide variety of information and functionality to the function.</p>
<ul>
<li>The name and ID of a Pulsar Function.</li>
<li>The message ID of each message. Each Pulsar message is automatically assigned with an ID.</li>
<li>The key, event time, properties and partition key of each message.</li>
<li>The name of the topic to which the message is sent.</li>
<li>The names of all input topics as well as the output topic associated with the function.</li>
<li>The name of the class used for <a href="#serde">SerDe</a>.</li>
<li>The <a href="/docs/en/2.9.2/reference-terminology#tenant">tenant</a> and namespace associated with the function.</li>
<li>The ID of the Pulsar Functions instance running the function.</li>
<li>The version of the function.</li>
<li>The <a href="/docs/en/2.9.2/functions-develop#logger">logger object</a> used by the function, which can be used to create function log messages.</li>
<li>Access to arbitrary <a href="#user-config">user configuration</a> values supplied via the CLI.</li>
<li>An interface for recording <a href="#metrics">metrics</a>.</li>
<li>An interface for storing and retrieving state in <a href="#state-storage">state storage</a>.</li>
<li>A function to publish new messages onto arbitrary topics.</li>
<li>A function to ack the message being processed (if auto-ack is disabled).</li>
<li>(Java) get Pulsar admin client.</li>
</ul>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-2041-tab-2042" class="nav-link active" data-group="group_2041" data-tab="tab-group-2041-content-2042">Java</div><div id="tab-group-2041-tab-2043" class="nav-link" data-group="group_2041" data-tab="tab-group-2041-content-2043">Python</div><div id="tab-group-2041-tab-2044" class="nav-link" data-group="group_2041" data-tab="tab-group-2041-content-2044">Go</div></div><div class="tab-content"><div id="tab-group-2041-content-2042" class="tab-pane active" data-group="group_2041" tabindex="-1"><div><span><p>The <a href="https://github.com/apache/pulsar/blob/master/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java">Context</a> interface provides a number of methods that you can use to access the function <a href="#context">context</a>. The various method signatures for the <code>Context</code> interface are listed as follows.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">interface</span> <span class="hljs-title">Context</span> </span>{<br /> Record&lt;?&gt; getCurrentRecord();<br /> <span class="hljs-function">Collection&lt;String&gt; <span class="hljs-title">getInputTopics</span><span class="hljs-params">()</span></span>;<br /> <span class="hljs-function">String <span class="hljs-title">getOutputTopic</span><span class="hljs-params">()</span></span>;<br /> <span class="hljs-function">String <span class="hljs-title">getOutputSchemaType</span><span class="hljs-params">()</span></span>;<br /> <span class="hljs-function">String <span class="hljs-title">getTenant</span><span class="hljs-params">()</span></span>;<br /> <span class="hljs-function">String <span class="hljs-title">getNamespace</span><span class="hljs-params">()</span></span>;<br /> <span class="hljs-function">String <span class="hljs-title">getFunctionName</span><span class="hljs-params">()</span></span>;<br /> <span class="hljs-function">String <span class="hljs-title">getFunctionId</span><span class="hljs-params">()</span></span>;<br /> <span class="hljs-function">String <span class="hljs-title">getInstanceId</span><span class="hljs-params">()</span></span>;<br /> <span class="hljs-function">String <span class="hljs-title">getFunctionVersion</span><span class="hljs-params">()</span></span>;<br /> <span class="hljs-function">Logger <span class="hljs-title">getLogger</span><span class="hljs-params">()</span></span>;<br /> <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">incrCounter</span><span class="hljs-params">(String key, <span class="hljs-keyword">long</span> amount)</span></span>;<br /> <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">incrCounterAsync</span><span class="hljs-params">(String key, <span class="hljs-keyword">long</span> amount)</span></span>;<br /> <span class="hljs-function"><span class="hljs-keyword">long</span> <span class="hljs-title">getCounter</span><span class="hljs-params">(String key)</span></span>;<br /> <span class="hljs-function"><span class="hljs-keyword">long</span> <span class="hljs-title">getCounterAsync</span><span class="hljs-params">(String key)</span></span>;<br /> <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">putState</span><span class="hljs-params">(String key, ByteBuffer value)</span></span>;<br /> <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">putStateAsync</span><span class="hljs-params">(String key, ByteBuffer value)</span></span>;<br /> <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">deleteState</span><span class="hljs-params">(String key)</span></span>;<br /> <span class="hljs-function">ByteBuffer <span class="hljs-title">getState</span><span class="hljs-params">(String key)</span></span>;<br /> <span class="hljs-function">ByteBuffer <span class="hljs-title">getStateAsync</span><span class="hljs-params">(String key)</span></span>;<br /> <span class="hljs-function">Map&lt;String, Object&gt; <span class="hljs-title">getUserConfigMap</span><span class="hljs-params">()</span></span>;<br /> <span class="hljs-function">Optional&lt;Object&gt; <span class="hljs-title">getUserConfigValue</span><span class="hljs-params">(String key)</span></span>;<br /> <span class="hljs-function">Object <span class="hljs-title">getUserConfigValueOrDefault</span><span class="hljs-params">(String key, Object defaultValue)</span></span>;<br /> <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">recordMetric</span><span class="hljs-params">(String metricName, <span class="hljs-keyword">double</span> value)</span></span>;<br /> &lt;O&gt; <span class="hljs-function">CompletableFuture&lt;Void&gt; <span class="hljs-title">publish</span><span class="hljs-params">(String topicName, O object, String schemaOrSerdeClassName)</span></span>;<br /> &lt;O&gt; <span class="hljs-function">CompletableFuture&lt;Void&gt; <span class="hljs-title">publish</span><span class="hljs-params">(String topicName, O object)</span></span>;<br /> &lt;O&gt; <span class="hljs-function">TypedMessageBuilder&lt;O&gt; <span class="hljs-title">newOutputMessage</span><span class="hljs-params">(String topicName, Schema&lt;O&gt; schema)</span> <span class="hljs-keyword">throws</span> PulsarClientException</span>;<br /> &lt;O&gt; <span class="hljs-function">ConsumerBuilder&lt;O&gt; <span class="hljs-title">newConsumerBuilder</span><span class="hljs-params">(Schema&lt;O&gt; schema)</span> <span class="hljs-keyword">throws</span> PulsarClientException</span>;<br /> <span class="hljs-function">PulsarAdmin <span class="hljs-title">getPulsarAdmin</span><span class="hljs-params">()</span></span>;<br /> <span class="hljs-function">PulsarAdmin <span class="hljs-title">getPulsarAdmin</span><span class="hljs-params">(String clusterName)</span></span>;<br />}<br /></code></pre>
<p>The following example uses several methods available via the <code>Context</code> object.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;<br /><span class="hljs-keyword">import</span> org.slf4j.Logger;<br /><br /><span class="hljs-keyword">import</span> java.util.stream.Collectors;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ContextFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{<br /> <span class="hljs-function"><span class="hljs-keyword">public</span> Void <span class="hljs-title">process</span><span class="hljs-params">(String input, Context context)</span> </span>{<br /> Logger LOG = context.getLogger();<br /> String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(<span class="hljs-string">", "</span>));<br /> String functionName = context.getFunctionName();<br /><br /> String logMessage = String.format(<span class="hljs-string">"A message with a value of \"%s\" has arrived on one of the following topics: %s\n"</span>,<br /> input,<br /> inputTopics);<br /><br /> LOG.info(logMessage);<br /><br /> String metricName = String.format(<span class="hljs-string">"function-%s-messages-received"</span>, functionName);<br /> context.recordMetric(metricName, <span class="hljs-number">1</span>);<br /><br /> <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;<br /> }<br />}<br /></code></pre>
</span></div></div><div id="tab-group-2041-content-2043" class="tab-pane" data-group="group_2041" tabindex="-1"><div><span><pre><code class="hljs"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ContextImpl</span>(<span class="hljs-title">pulsar</span>.<span class="hljs-title">Context</span>):</span><br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_message_id</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_message_key</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_message_eventtime</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_message_properties</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_current_message_topic_name</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_partition_key</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_function_name</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_function_tenant</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_function_namespace</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_function_id</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_instance_id</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_function_version</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_logger</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_user_config_value</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, key)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_user_config_map</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">record_metric</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, metric_name, metric_value)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_input_topics</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_output_topic</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_output_serde_class_name</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">publish</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, topic_name, message, serde_class_name=<span class="hljs-string">"serde.IdentitySerDe"</span>,<br /> properties=None, compression_type=None, callback=None, message_conf=None)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">ack</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, msgid, topic)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_and_reset_metrics</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">reset_metrics</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_metrics</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">incr_counter</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, key, amount)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_counter</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, key)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">del_counter</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, key)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">put_state</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, key, value)</span></span>:<br /> ...<br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_state</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, key)</span></span>:<br /> ...<br /></code></pre>
</span></div></div><div id="tab-group-2041-content-2044" class="tab-pane" data-group="group_2041" tabindex="-1"><div><span><pre><code class="hljs"><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetInstanceID</span>() int {<br /> <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.instanceID<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetInputTopics</span>() []string {<br /> <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.inputTopics<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetOutputTopic</span>() string {<br /> <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.funcDetails.<span class="hljs-type">GetSink</span>().<span class="hljs-type">Topic</span><br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetFuncTenant</span>() string {<br /> <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.funcDetails.<span class="hljs-type">Tenant</span><br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetFuncName</span>() string {<br /> <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.funcDetails.<span class="hljs-type">Name</span><br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetFuncNamespace</span>() string {<br /> <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.funcDetails.<span class="hljs-type">Namespace</span><br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetFuncID</span>() string {<br /> <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.funcID<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetFuncVersion</span>() string {<br /> <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.funcVersion<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetUserConfValue</span>(key string) interface{} {<br /> <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.userConfigs[key]<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetUserConfMap</span>() <span class="hljs-built_in">map</span>[string]interface{} {<br /> <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.userConfigs<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">SetCurrentRecord</span>(record pulsar.<span class="hljs-type">Message</span>) {<br /> <span class="hljs-built_in">c</span>.record = record<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetCurrentRecord</span>() pulsar.<span class="hljs-type">Message</span> {<br /> <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.record<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">NewOutputMessage</span>(topic string) pulsar.<span class="hljs-type">Producer</span> {<br /> <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.outputMessage(topic)<br />}<br /></code></pre>
<p>The following example uses several methods available via the <code>Context</code> object.</p>
<pre><code class="hljs"><span class="hljs-keyword">import</span> (<br /> <span class="hljs-string">"context"</span><br /> <span class="hljs-string">"fmt"</span><br /><br /> <span class="hljs-string">"github.com/apache/pulsar/pulsar-function-go/pf"</span><br />)<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">contextFunc</span><span class="hljs-params">(ctx context.Context)</span></span> {<br /> <span class="hljs-keyword">if</span> fc, ok := pf.FromContext(ctx); ok {<br /> fmt.Printf(<span class="hljs-string">"function ID is:%s, "</span>, fc.GetFuncID())<br /> fmt.Printf(<span class="hljs-string">"function version is:%s\n"</span>, fc.GetFuncVersion())<br /> }<br />}<br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/77cf09eafa4f1626a53a1fe2e65dd25f377c1127/pulsar-function-go/examples/contextFunc/contextFunc.go#L29-L34">here</a>.</p>
</span></div></div></div></div>
<h3><a class="anchor" aria-hidden="true" id="user-config"></a><a href="#user-config" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>User config</h3>
<p>When you run or update Pulsar Functions created using SDK, you can pass arbitrary key/values to them with the command line with the <code>--user-config</code> flag. Key/values must be specified as JSON. The following function creation command passes a user configured key/value to a function.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \
--name word-filter \
<span class="hljs-comment"># Other function configs</span>
--user-config <span class="hljs-string">'{"forbidden-word":"rosebud"}'</span>
</code></pre>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-2045-tab-2046" class="nav-link active" data-group="group_2045" data-tab="tab-group-2045-content-2046">Java</div><div id="tab-group-2045-tab-2047" class="nav-link" data-group="group_2045" data-tab="tab-group-2045-content-2047">Python</div><div id="tab-group-2045-tab-2048" class="nav-link" data-group="group_2045" data-tab="tab-group-2045-content-2048">Go</div></div><div class="tab-content"><div id="tab-group-2045-content-2046" class="tab-pane active" data-group="group_2045" tabindex="-1"><div><span><p>The Java SDK <a href="#context"><code>Context</code></a> object enables you to access key/value pairs provided to Pulsar Functions via the command line (as JSON). The following example passes a key/value pair.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br /> <span class="hljs-comment"># Other function configs</span><br /> --user-config <span class="hljs-string">'{"word-of-the-day":"verdure"}'</span><br /></code></pre>
<p>To access that value in a Java function:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;<br /><span class="hljs-keyword">import</span> org.slf4j.Logger;<br /><br /><span class="hljs-keyword">import</span> java.util.Optional;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">UserConfigFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{<br /> <span class="hljs-meta">@Override</span><br /> <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">apply</span><span class="hljs-params">(String input, Context context)</span> </span>{<br /> Logger LOG = context.getLogger();<br /> Optional&lt;String&gt; wotd = context.getUserConfigValue(<span class="hljs-string">"word-of-the-day"</span>);<br /> <span class="hljs-keyword">if</span> (wotd.isPresent()) {<br /> LOG.info(<span class="hljs-string">"The word of the day is {}"</span>, wotd);<br /> } <span class="hljs-keyword">else</span> {<br /> LOG.warn(<span class="hljs-string">"No word of the day provided"</span>);<br /> }<br /> <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;<br /> }<br />}<br /></code></pre>
<p>The <code>UserConfigFunction</code> function will log the string <code>&quot;The word of the day is verdure&quot;</code> every time the function is invoked (which means every time a message arrives). The <code>word-of-the-day</code> user config will be changed only when the function is updated with a new config value via the command line.</p>
<p>You can also access the entire user config map or set a default value in case no value is present:</p>
<pre><code class="hljs css language-java"><span class="hljs-comment">// Get the whole config map</span><br />Map&lt;String, String&gt; allConfigs = context.getUserConfigMap();<br /><br /><span class="hljs-comment">// Get value or resort to default</span><br />String wotd = context.getUserConfigValueOrDefault(<span class="hljs-string">"word-of-the-day"</span>, <span class="hljs-string">"perspicacious"</span>);<br /></code></pre>
<blockquote>
<p>For all key/value pairs passed to Java functions, both the key <em>and</em> the value are <code>String</code>. To set the value to be a different type, you need to deserialize from the <code>String</code> type.</p>
</blockquote>
</span></div></div><div id="tab-group-2045-content-2047" class="tab-pane" data-group="group_2045" tabindex="-1"><div><span><p>In Python function, you can access the configuration value like this.</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">WordFilter</span><span class="hljs-params">(Function)</span>:</span><br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, context, input)</span>:</span><br /> forbidden_word = context.user_config()[<span class="hljs-string">"forbidden-word"</span>]<br /><br /> <span class="hljs-comment"># Don't publish the message if it contains the user-supplied</span><br /> <span class="hljs-comment"># forbidden word</span><br /> <span class="hljs-keyword">if</span> forbidden_word <span class="hljs-keyword">in</span> input:<br /> <span class="hljs-keyword">pass</span><br /> <span class="hljs-comment"># Otherwise publish the message</span><br /> <span class="hljs-keyword">else</span>:<br /> <span class="hljs-keyword">return</span> input<br /></code></pre>
<p>The Python SDK <a href="#context"><code>Context</code></a> object enables you to access key/value pairs provided to Pulsar Functions via the command line (as JSON). The following example passes a key/value pair.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br /> <span class="hljs-comment"># Other function configs \</span><br /> --user-config <span class="hljs-string">'{"word-of-the-day":"verdure"}'</span><br /></code></pre>
<p>To access that value in a Python function:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">UserConfigFunction</span><span class="hljs-params">(Function)</span>:</span><br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span><br /> logger = context.get_logger()<br /> wotd = context.get_user_config_value(<span class="hljs-string">'word-of-the-day'</span>)<br /> <span class="hljs-keyword">if</span> wotd <span class="hljs-keyword">is</span> <span class="hljs-literal">None</span>:<br /> logger.warn(<span class="hljs-string">'No word of the day provided'</span>)<br /> <span class="hljs-keyword">else</span>:<br /> logger.info(<span class="hljs-string">"The word of the day is {0}"</span>.format(wotd))<br /></code></pre>
</span></div></div><div id="tab-group-2045-content-2048" class="tab-pane" data-group="group_2045" tabindex="-1"><div><span><p>The Go SDK <a href="#context"><code>Context</code></a> object enables you to access key/value pairs provided to Pulsar Functions via the command line (as JSON). The following example passes a key/value pair.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br /> --go path/to/go/binary<br /> --user-config <span class="hljs-string">'{"word-of-the-day":"lackadaisical"}'</span><br /></code></pre>
<p>To access that value in a Go function:</p>
<pre><code class="hljs css language-go"><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">contextFunc</span><span class="hljs-params">(ctx context.Context)</span></span> {<br /> fc, ok := pf.FromContext(ctx)<br /> <span class="hljs-keyword">if</span> !ok {<br /> logutil.Fatal(<span class="hljs-string">"Function context is not defined"</span>)<br /> }<br /><br /> wotd := fc.GetUserConfValue(<span class="hljs-string">"word-of-the-day"</span>)<br /><br /> <span class="hljs-keyword">if</span> wotd == <span class="hljs-literal">nil</span> {<br /> logutil.Warn(<span class="hljs-string">"The word of the day is empty"</span>)<br /> } <span class="hljs-keyword">else</span> {<br /> logutil.Infof(<span class="hljs-string">"The word of the day is %s"</span>, wotd.(<span class="hljs-keyword">string</span>))<br /> }<br />}<br /></code></pre>
</span></div></div></div></div>
<h3><a class="anchor" aria-hidden="true" id="logger"></a><a href="#logger" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Logger</h3>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-2049-tab-2050" class="nav-link active" data-group="group_2049" data-tab="tab-group-2049-content-2050">Java</div><div id="tab-group-2049-tab-2051" class="nav-link" data-group="group_2049" data-tab="tab-group-2049-content-2051">Python</div><div id="tab-group-2049-tab-2052" class="nav-link" data-group="group_2049" data-tab="tab-group-2049-content-2052">Go</div></div><div class="tab-content"><div id="tab-group-2049-content-2050" class="tab-pane active" data-group="group_2049" tabindex="-1"><div><span><p>Pulsar Functions that use the Java SDK have access to an <a href="https://www.slf4j.org/">SLF4j</a> <a href="https://www.slf4j.org/api/org/apache/log4j/Logger.html"><code>Logger</code></a> object that can be used to produce logs at the chosen log level. The following example logs either a <code>WARNING</code>- or <code>INFO</code>-level log based on whether the incoming string contains the word <code>danger</code>.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;<br /><span class="hljs-keyword">import</span> org.slf4j.Logger;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">LoggingFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{<br /> <span class="hljs-meta">@Override</span><br /> <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">apply</span><span class="hljs-params">(String input, Context context)</span> </span>{<br /> Logger LOG = context.getLogger();<br /> String messageId = <span class="hljs-keyword">new</span> String(context.getMessageId());<br /><br /> <span class="hljs-keyword">if</span> (input.contains(<span class="hljs-string">"danger"</span>)) {<br /> LOG.warn(<span class="hljs-string">"A warning was received in message {}"</span>, messageId);<br /> } <span class="hljs-keyword">else</span> {<br /> LOG.info(<span class="hljs-string">"Message {} received\nContent: {}"</span>, messageId, input);<br /> }<br /><br /> <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;<br /> }<br />}<br /></code></pre>
<p>If you want your function to produce logs, you need to specify a log topic when creating or running the function. The following is an example.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br /> --jar my-functions.jar \<br /> --classname my.package.LoggingFunction \<br /> --<span class="hljs-built_in">log</span>-topic persistent://public/default/logging-function-logs \<br /> <span class="hljs-comment"># Other function configs</span><br /></code></pre>
<p>All logs produced by <code>LoggingFunction</code> above can be accessed via the <code>persistent://public/default/logging-function-logs</code> topic.</p>
<h4><a class="anchor" aria-hidden="true" id="customize-function-log-level"></a><a href="#customize-function-log-level" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Customize Function log level</h4>
<p>Additionally, you can use the XML file, <code>functions_log4j2.xml</code>, to customize the function log level.
To customize the function log level, create or update <code>functions_log4j2.xml</code> in your Pulsar conf directory (for example, <code>/etc/pulsar/</code> on bare-metal, or <code>/pulsar/conf</code> on Kubernetes) to contain contents such as:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">Configuration</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">name</span>&gt;</span>pulsar-functions-instance<span class="hljs-tag">&lt;/<span class="hljs-name">name</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">monitorInterval</span>&gt;</span>30<span class="hljs-tag">&lt;/<span class="hljs-name">monitorInterval</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Properties</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Property</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">name</span>&gt;</span>pulsar.log.appender<span class="hljs-tag">&lt;/<span class="hljs-name">name</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">value</span>&gt;</span>RollingFile<span class="hljs-tag">&lt;/<span class="hljs-name">value</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Property</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Property</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">name</span>&gt;</span>pulsar.log.level<span class="hljs-tag">&lt;/<span class="hljs-name">name</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">value</span>&gt;</span>debug<span class="hljs-tag">&lt;/<span class="hljs-name">value</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Property</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Property</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">name</span>&gt;</span>bk.log.level<span class="hljs-tag">&lt;/<span class="hljs-name">name</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">value</span>&gt;</span>debug<span class="hljs-tag">&lt;/<span class="hljs-name">value</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Property</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Properties</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Appenders</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Console</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">name</span>&gt;</span>Console<span class="hljs-tag">&lt;/<span class="hljs-name">name</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">target</span>&gt;</span>SYSTEM_OUT<span class="hljs-tag">&lt;/<span class="hljs-name">target</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">PatternLayout</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Pattern</span>&gt;</span>%d{ISO8601_OFFSET_DATE_TIME_HHMM} [%t] %-5level %logger{36} - %msg%n<span class="hljs-tag">&lt;/<span class="hljs-name">Pattern</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">PatternLayout</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Console</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">RollingFile</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">name</span>&gt;</span>RollingFile<span class="hljs-tag">&lt;/<span class="hljs-name">name</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">fileName</span>&gt;</span>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log<span class="hljs-tag">&lt;/<span class="hljs-name">fileName</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">filePattern</span>&gt;</span>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz<span class="hljs-tag">&lt;/<span class="hljs-name">filePattern</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">immediateFlush</span>&gt;</span>true<span class="hljs-tag">&lt;/<span class="hljs-name">immediateFlush</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">PatternLayout</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Pattern</span>&gt;</span>%d{ISO8601_OFFSET_DATE_TIME_HHMM} [%t] %-5level %logger{36} - %msg%n<span class="hljs-tag">&lt;/<span class="hljs-name">Pattern</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">PatternLayout</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Policies</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">TimeBasedTriggeringPolicy</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">interval</span>&gt;</span>1<span class="hljs-tag">&lt;/<span class="hljs-name">interval</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">modulate</span>&gt;</span>true<span class="hljs-tag">&lt;/<span class="hljs-name">modulate</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">TimeBasedTriggeringPolicy</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">SizeBasedTriggeringPolicy</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">size</span>&gt;</span>1 GB<span class="hljs-tag">&lt;/<span class="hljs-name">size</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">SizeBasedTriggeringPolicy</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">CronTriggeringPolicy</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">schedule</span>&gt;</span>0 0 0 * * ?<span class="hljs-tag">&lt;/<span class="hljs-name">schedule</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">CronTriggeringPolicy</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Policies</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">DefaultRolloverStrategy</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Delete</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">basePath</span>&gt;</span>${sys:pulsar.function.log.dir}<span class="hljs-tag">&lt;/<span class="hljs-name">basePath</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">maxDepth</span>&gt;</span>2<span class="hljs-tag">&lt;/<span class="hljs-name">maxDepth</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">IfFileName</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">glob</span>&gt;</span>*/${sys:pulsar.function.log.file}*log.gz<span class="hljs-tag">&lt;/<span class="hljs-name">glob</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">IfFileName</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">IfLastModified</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">age</span>&gt;</span>30d<span class="hljs-tag">&lt;/<span class="hljs-name">age</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">IfLastModified</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Delete</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">DefaultRolloverStrategy</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">RollingFile</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">RollingRandomAccessFile</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">name</span>&gt;</span>BkRollingFile<span class="hljs-tag">&lt;/<span class="hljs-name">name</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">fileName</span>&gt;</span>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk<span class="hljs-tag">&lt;/<span class="hljs-name">fileName</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">filePattern</span>&gt;</span>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz<span class="hljs-tag">&lt;/<span class="hljs-name">filePattern</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">immediateFlush</span>&gt;</span>true<span class="hljs-tag">&lt;/<span class="hljs-name">immediateFlush</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">PatternLayout</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Pattern</span>&gt;</span>%d{ISO8601_OFFSET_DATE_TIME_HHMM} [%t] %-5level %logger{36} - %msg%n<span class="hljs-tag">&lt;/<span class="hljs-name">Pattern</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">PatternLayout</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Policies</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">TimeBasedTriggeringPolicy</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">interval</span>&gt;</span>1<span class="hljs-tag">&lt;/<span class="hljs-name">interval</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">modulate</span>&gt;</span>true<span class="hljs-tag">&lt;/<span class="hljs-name">modulate</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">TimeBasedTriggeringPolicy</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">SizeBasedTriggeringPolicy</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">size</span>&gt;</span>1 GB<span class="hljs-tag">&lt;/<span class="hljs-name">size</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">SizeBasedTriggeringPolicy</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">CronTriggeringPolicy</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">schedule</span>&gt;</span>0 0 0 * * ?<span class="hljs-tag">&lt;/<span class="hljs-name">schedule</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">CronTriggeringPolicy</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Policies</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">DefaultRolloverStrategy</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Delete</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">basePath</span>&gt;</span>${sys:pulsar.function.log.dir}<span class="hljs-tag">&lt;/<span class="hljs-name">basePath</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">maxDepth</span>&gt;</span>2<span class="hljs-tag">&lt;/<span class="hljs-name">maxDepth</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">IfFileName</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">glob</span>&gt;</span>*/${sys:pulsar.function.log.file}.bk*log.gz<span class="hljs-tag">&lt;/<span class="hljs-name">glob</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">IfFileName</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">IfLastModified</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">age</span>&gt;</span>30d<span class="hljs-tag">&lt;/<span class="hljs-name">age</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">IfLastModified</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Delete</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">DefaultRolloverStrategy</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">RollingRandomAccessFile</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Appenders</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Loggers</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Logger</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">name</span>&gt;</span>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper<span class="hljs-tag">&lt;/<span class="hljs-name">name</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">level</span>&gt;</span>${sys:bk.log.level}<span class="hljs-tag">&lt;/<span class="hljs-name">level</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">additivity</span>&gt;</span>false<span class="hljs-tag">&lt;/<span class="hljs-name">additivity</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">AppenderRef</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">ref</span>&gt;</span>BkRollingFile<span class="hljs-tag">&lt;/<span class="hljs-name">ref</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">AppenderRef</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Logger</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Root</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">level</span>&gt;</span>${sys:pulsar.log.level}<span class="hljs-tag">&lt;/<span class="hljs-name">level</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">AppenderRef</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">ref</span>&gt;</span>${sys:pulsar.log.appender}<span class="hljs-tag">&lt;/<span class="hljs-name">ref</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">level</span>&gt;</span>${sys:pulsar.log.level}<span class="hljs-tag">&lt;/<span class="hljs-name">level</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">AppenderRef</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Root</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Loggers</span>&gt;</span><br /><span class="hljs-tag">&lt;/<span class="hljs-name">Configuration</span>&gt;</span><br /></code></pre>
<p>The properties set like:</p>
<pre><code class="hljs css language-xml"> <span class="hljs-tag">&lt;<span class="hljs-name">Property</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">name</span>&gt;</span>pulsar.log.level<span class="hljs-tag">&lt;/<span class="hljs-name">name</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">value</span>&gt;</span>debug<span class="hljs-tag">&lt;/<span class="hljs-name">value</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Property</span>&gt;</span><br /></code></pre>
<p>propagate to places where they are referenced, such as:</p>
<pre><code class="hljs css language-xml"> <span class="hljs-tag">&lt;<span class="hljs-name">Root</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">level</span>&gt;</span>${sys:pulsar.log.level}<span class="hljs-tag">&lt;/<span class="hljs-name">level</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">AppenderRef</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">ref</span>&gt;</span>${sys:pulsar.log.appender}<span class="hljs-tag">&lt;/<span class="hljs-name">ref</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">level</span>&gt;</span>${sys:pulsar.log.level}<span class="hljs-tag">&lt;/<span class="hljs-name">level</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">AppenderRef</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Root</span>&gt;</span><br /></code></pre>
<p>In the above example, debug level logging would be applied to ALL function logs.
This may be more verbose than you desire. To be more selective, you can apply different log levels to different classes or modules. For example:</p>
<pre><code class="hljs css language-xml"> <span class="hljs-tag">&lt;<span class="hljs-name">Logger</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">name</span>&gt;</span>com.example.module<span class="hljs-tag">&lt;/<span class="hljs-name">name</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">level</span>&gt;</span>info<span class="hljs-tag">&lt;/<span class="hljs-name">level</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">additivity</span>&gt;</span>false<span class="hljs-tag">&lt;/<span class="hljs-name">additivity</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">AppenderRef</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">ref</span>&gt;</span>${sys:pulsar.log.appender}<span class="hljs-tag">&lt;/<span class="hljs-name">ref</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">AppenderRef</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Logger</span>&gt;</span><br /></code></pre>
<p>You can be more specific as well, such as applying a more verbose log level to a class in the module, such as:</p>
<pre><code class="hljs css language-xml"> <span class="hljs-tag">&lt;<span class="hljs-name">Logger</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">name</span>&gt;</span>com.example.module.className<span class="hljs-tag">&lt;/<span class="hljs-name">name</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">level</span>&gt;</span>debug<span class="hljs-tag">&lt;/<span class="hljs-name">level</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">additivity</span>&gt;</span>false<span class="hljs-tag">&lt;/<span class="hljs-name">additivity</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">AppenderRef</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">ref</span>&gt;</span>Console<span class="hljs-tag">&lt;/<span class="hljs-name">ref</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">AppenderRef</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">Logger</span>&gt;</span><br /></code></pre>
<p>Each <code>&lt;AppenderRef&gt;</code> entry allows you to output the log to a target specified in the definition of the Appender.</p>
<p>Additivity pertains to whether log messages will be duplicated if multiple Logger entries overlap.
To disable additivity, specify</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">additivity</span>&gt;</span>false<span class="hljs-tag">&lt;/<span class="hljs-name">additivity</span>&gt;</span><br /></code></pre>
<p>as shown in examples above. Disabling additivity prevents duplication of log messages when one or more <code>&lt;Logger&gt;</code> entries contain classes or modules that overlap.</p>
<p>The <code>&lt;AppenderRef&gt;</code> is defined in the <code>&lt;Appenders&gt;</code> section, such as:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">Console</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">name</span>&gt;</span>Console<span class="hljs-tag">&lt;/<span class="hljs-name">name</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">target</span>&gt;</span>SYSTEM_OUT<span class="hljs-tag">&lt;/<span class="hljs-name">target</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">PatternLayout</span>&gt;</span><br /> <span class="hljs-tag">&lt;<span class="hljs-name">Pattern</span>&gt;</span>%d{ISO8601_OFFSET_DATE_TIME_HHMM} [%t] %-5level %logger{36} - %msg%n<span class="hljs-tag">&lt;/<span class="hljs-name">Pattern</span>&gt;</span><br /> <span class="hljs-tag">&lt;/<span class="hljs-name">PatternLayout</span>&gt;</span><br /><span class="hljs-tag">&lt;/<span class="hljs-name">Console</span>&gt;</span><br /></code></pre>
</span></div></div><div id="tab-group-2049-content-2051" class="tab-pane" data-group="group_2049" tabindex="-1"><div><span><p>Pulsar Functions that use the Python SDK have access to a logging object that can be used to produce logs at the chosen log level. The following example function that logs either a <code>WARNING</code>- or <code>INFO</code>-level log based on whether the incoming string contains the word <code>danger</code>.</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">LoggingFunction</span><span class="hljs-params">(Function)</span>:</span><br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span><br /> logger = context.get_logger()<br /> msg_id = context.get_message_id()<br /> <span class="hljs-keyword">if</span> <span class="hljs-string">'danger'</span> <span class="hljs-keyword">in</span> input:<br /> logger.warn(<span class="hljs-string">"A warning was received in message {0}"</span>.format(context.get_message_id()))<br /> <span class="hljs-keyword">else</span>:<br /> logger.info(<span class="hljs-string">"Message {0} received\nContent: {1}"</span>.format(msg_id, input))<br /></code></pre>
<p>If you want your function to produce logs on a Pulsar topic, you need to specify a <strong>log topic</strong> when creating or running the function. The following is an example.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br /> --py logging_function.py \<br /> --classname logging_function.LoggingFunction \<br /> --<span class="hljs-built_in">log</span>-topic logging-function-logs \<br /> <span class="hljs-comment"># Other function configs</span><br /></code></pre>
<p>All logs produced by <code>LoggingFunction</code> above can be accessed via the <code>logging-function-logs</code> topic.
Additionally, you can specify the function log level through the broker XML file as described in <a href="#customize-function-log-level">Customize Function log level</a>.</p>
</span></div></div><div id="tab-group-2049-content-2052" class="tab-pane" data-group="group_2049" tabindex="-1"><div><span><p>The following Go Function example shows different log levels based on the function input.</p>
<pre><code class="hljs">import (<br /> <span class="hljs-string">"context"</span><br /><br /> <span class="hljs-string">"github.com/apache/pulsar/pulsar-function-go/pf"</span><br /><br /> log <span class="hljs-string">"github.com/apache/pulsar/pulsar-function-go/logutil"</span><br />)<br /><br />func logger<span class="hljs-constructor">Func(<span class="hljs-params">ctx</span> <span class="hljs-params">context</span>.Context, <span class="hljs-params">input</span> []<span class="hljs-params">byte</span>)</span> {<br /> <span class="hljs-keyword">if</span> len(input) &lt;= <span class="hljs-number">100</span> {<br /> log.<span class="hljs-constructor">Infof(<span class="hljs-string">"This input has a length of: %d"</span>, <span class="hljs-params">len</span>(<span class="hljs-params">input</span>)</span>)<br /> } <span class="hljs-keyword">else</span> {<br /> log.<span class="hljs-constructor">Warnf(<span class="hljs-string">"This input is getting too long! It has {%d} characters"</span>, <span class="hljs-params">len</span>(<span class="hljs-params">input</span>)</span>)<br /> }<br />}<br /><br />func main<span class="hljs-literal">()</span> {<br /> pf.<span class="hljs-constructor">Start(<span class="hljs-params">loggerFunc</span>)</span><br />}<br /></code></pre>
<p>When you use <code>logTopic</code> related functionalities in Go Function, import <code>github.com/apache/pulsar/pulsar-function-go/logutil</code>, and you do not have to use the <code>getLogger()</code> context object.</p>
<p>Additionally, you can specify the function log level through the broker XML file, as described here: <a href="#customize-function-log-level">Customize Function log level</a></p>
</span></div></div></div></div>
<h3><a class="anchor" aria-hidden="true" id="pulsar-admin"></a><a href="#pulsar-admin" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Pulsar admin</h3>
<p>Pulsar Functions using the Java SDK has access to the Pulsar admin client, which allows the Pulsar admin client to manage API calls to current Pulsar clusters or external clusters (if <code>external-pulsars</code> is provided).</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-2053-tab-2054" class="nav-link active" data-group="group_2053" data-tab="tab-group-2053-content-2054">Java</div></div><div class="tab-content"><div id="tab-group-2053-content-2054" class="tab-pane active" data-group="group_2053" tabindex="-1"><div><span><p>Below is an example of how to use the Pulsar admin client exposed from the Function <code>context</code>.</p>
<pre><code class="hljs"><span class="hljs-keyword">import</span> org.apache.pulsar.client.admin.PulsarAdmin;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.<span class="hljs-built_in">Function</span>;<br /><br /><span class="hljs-comment"><span class="markdown">/**<br /><span class="hljs-bullet"> * </span>In this particular example, for every input message,<br /><span class="hljs-bullet"> * </span></span>the<span class="markdown"> function resets </span>the<span class="markdown"> cursor of </span>the<span class="markdown"> current function's subscription to </span>a<span class="markdown"><span class="hljs-bullet"><br /> * </span>specified timestamp.<br /> */</span></span><br />public <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">CursorManagementFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">String</span>&gt; </span>{<br /><br /> <span class="hljs-meta">@Override</span><br /> public <span class="hljs-built_in">String</span> process(<span class="hljs-built_in">String</span> input, Context context) throws Exception {<br /> PulsarAdmin adminClient = context.getPulsarAdmin();<br /> <span class="hljs-keyword">if</span> (adminClient != <span class="hljs-keyword">null</span>) {<br /> <span class="hljs-built_in">String</span> topic = context.getCurrentRecord().getTopicName().isPresent() ?<br /> context.getCurrentRecord().getTopicName().<span class="hljs-keyword">get</span>() : <span class="hljs-keyword">null</span>;<br /> <span class="hljs-built_in">String</span> subName = context.getTenant() + <span class="hljs-string">"/"</span> + context.getNamespace() + <span class="hljs-string">"/"</span> + context.getFunctionName();<br /> <span class="hljs-keyword">if</span> (topic != <span class="hljs-keyword">null</span>) {<br /> <span class="hljs-comment">// 1578188166 below is a random-pick timestamp</span><br /> adminClient.topics().resetCursor(topic, subName, <span class="hljs-number">1578188166</span>);<br /> <span class="hljs-keyword">return</span> <span class="hljs-string">"reset cursor successfully"</span>;<br /> }<br /> }<br /> <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;<br /> }<br />}<br /></code></pre>
<p>If you want your function to get access to the Pulsar admin client, you need to enable this feature by setting <code>exposeAdminClientEnabled=true</code> in the <code>functions_worker.yml</code> file. You can test whether this feature is enabled or not using the command <code>pulsar-admin functions localrun</code> with the flag <code>--web-service-url</code>.</p>
<pre><code class="hljs">$ bin/pulsar-<span class="hljs-keyword">admin</span> <span class="hljs-keyword">functions</span> localrun \<br /> <span class="hljs-comment">--jar my-functions.jar \</span><br /> <span class="hljs-comment">--classname my.package.CursorManagementFunction \</span><br /> <span class="hljs-comment">--web-service-url http://pulsar-web-service:8080 \</span><br /> # Other <span class="hljs-keyword">function</span> configs<br /></code></pre>
</span></div></div></div></div>
<h2><a class="anchor" aria-hidden="true" id="metrics"></a><a href="#metrics" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Metrics</h2>
<p>Pulsar Functions allows you to deploy and manage processing functions that consume messages from and publish messages to Pulsar topics easily. It is important to ensure that the running functions are healthy at any time. Pulsar Functions can publish arbitrary metrics to the metrics interface which can be queried.</p>
<blockquote>
<p><strong>Note</strong></p>
<p>If a Pulsar Function uses the language-native interface for Java or Python, that function is not able to publish metrics and stats to Pulsar.</p>
</blockquote>
<p>You can monitor Pulsar Functions that have been deployed with the following methods:</p>
<ul>
<li><p>Check the metrics provided by Pulsar.</p>
<p>Pulsar Functions expose the metrics that can be collected and used for monitoring the health of <strong>Java, Python, and Go</strong> functions. You can check the metrics by following the <a href="/docs/en/2.9.2/deploy-monitoring">monitoring</a> guide.</p>
<p>For the complete list of the function metrics, see <a href="/docs/en/2.9.2/reference-metrics#pulsar-functions">here</a>.</p></li>
<li><p>Set and check your customized metrics.</p>
<p>In addition to the metrics provided by Pulsar, Pulsar allows you to customize metrics for <strong>Java and Python</strong> functions. Function workers collect user-defined metrics to Prometheus automatically and you can check them in Grafana.</p></li>
</ul>
<p>Here are examples of how to customize metrics for Java and Python functions.</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-2055-tab-2056" class="nav-link active" data-group="group_2055" data-tab="tab-group-2055-content-2056">Java</div><div id="tab-group-2055-tab-2057" class="nav-link" data-group="group_2055" data-tab="tab-group-2055-content-2057">Python</div><div id="tab-group-2055-tab-2058" class="nav-link" data-group="group_2055" data-tab="tab-group-2055-content-2058">Go</div></div><div class="tab-content"><div id="tab-group-2055-content-2056" class="tab-pane active" data-group="group_2055" tabindex="-1"><div><span><p>You can record metrics using the <a href="#context"><code>Context</code></a> object on a per-key basis. For example, you can set a metric for the <code>process-count</code> key and a different metric for the <code>elevens-count</code> key every time the function processes a message.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MetricRecorderFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">Integer</span>, <span class="hljs-title">Void</span>&gt; </span>{<br /> <span class="hljs-meta">@Override</span><br /> <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">apply</span><span class="hljs-params">(Integer input, Context context)</span> </span>{<br /> <span class="hljs-comment">// Records the metric 1 every time a message arrives</span><br /> context.recordMetric(<span class="hljs-string">"hit-count"</span>, <span class="hljs-number">1</span>);<br /><br /> <span class="hljs-comment">// Records the metric only if the arriving number equals 11</span><br /> <span class="hljs-keyword">if</span> (input == <span class="hljs-number">11</span>) {<br /> context.recordMetric(<span class="hljs-string">"elevens-count"</span>, <span class="hljs-number">1</span>);<br /> }<br /><br /> <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;<br /> }<br />}<br /></code></pre>
</span></div></div><div id="tab-group-2055-content-2057" class="tab-pane" data-group="group_2055" tabindex="-1"><div><span><p>You can record metrics using the <a href="#context"><code>Context</code></a> object on a per-key basis. For example, you can set a metric for the <code>process-count</code> key and a different metric for the <code>elevens-count</code> key every time the function processes a message. The following is an example.</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MetricRecorderFunction</span><span class="hljs-params">(Function)</span>:</span><br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span><br /> context.record_metric(<span class="hljs-string">'hit-count'</span>, <span class="hljs-number">1</span>)<br /><br /> <span class="hljs-keyword">if</span> input == <span class="hljs-number">11</span>:<br /> context.record_metric(<span class="hljs-string">'elevens-count'</span>, <span class="hljs-number">1</span>)<br /></code></pre>
</span></div></div><div id="tab-group-2055-content-2058" class="tab-pane" data-group="group_2055" tabindex="-1"><div><span><p>Currently, the feature is not available in Go.</p>
</span></div></div></div></div>
<h2><a class="anchor" aria-hidden="true" id="security"></a><a href="#security" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Security</h2>
<p>If you want to enable security on Pulsar Functions, first you should enable security on <a href="/docs/en/2.9.2/functions-worker">Functions Workers</a>. For more details, refer to <a href="/docs/en/2.9.2/functions-worker#security-settings">Security settings</a>.</p>
<p>Pulsar Functions can support the following providers:</p>
<ul>
<li>ClearTextSecretsProvider</li>
<li>EnvironmentBasedSecretsProvider</li>
</ul>
<blockquote>
<p>Pulsar Function supports ClearTextSecretsProvider by default.</p>
</blockquote>
<p>At the same time, Pulsar Functions provides two interfaces, <strong>SecretsProvider</strong> and <strong>SecretsProviderConfigurator</strong>, allowing users to customize secret provider.</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-2059-tab-2060" class="nav-link active" data-group="group_2059" data-tab="tab-group-2059-content-2060">Java</div><div id="tab-group-2059-tab-2061" class="nav-link" data-group="group_2059" data-tab="tab-group-2059-content-2061">Python</div><div id="tab-group-2059-tab-2062" class="nav-link" data-group="group_2059" data-tab="tab-group-2059-content-2062">Go</div></div><div class="tab-content"><div id="tab-group-2059-content-2060" class="tab-pane active" data-group="group_2059" tabindex="-1"><div><span><p>You can get secret provider using the <a href="#context"><code>Context</code></a> object. The following is an example:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;<br /><span class="hljs-keyword">import</span> org.slf4j.Logger;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">GetSecretProviderFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{<br /><br /> <span class="hljs-meta">@Override</span><br /> <span class="hljs-function"><span class="hljs-keyword">public</span> Void <span class="hljs-title">process</span><span class="hljs-params">(String input, Context context)</span> <span class="hljs-keyword">throws</span> Exception </span>{<br /> Logger LOG = context.getLogger();<br /> String secretProvider = context.getSecret(input);<br /><br /> <span class="hljs-keyword">if</span> (!secretProvider.isEmpty()) {<br /> LOG.info(<span class="hljs-string">"The secret provider is {}"</span>, secretProvider);<br /> } <span class="hljs-keyword">else</span> {<br /> LOG.warn(<span class="hljs-string">"No secret provider"</span>);<br /> }<br /><br /> <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;<br /> }<br />}<br /></code></pre>
</span></div></div><div id="tab-group-2059-content-2061" class="tab-pane" data-group="group_2059" tabindex="-1"><div><span><p>You can get secret provider using the <a href="#context"><code>Context</code></a> object. The following is an example:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">GetSecretProviderFunction</span><span class="hljs-params">(Function)</span>:</span><br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span><br /> logger = context.get_logger()<br /> secret_provider = context.get_secret(input)<br /> <span class="hljs-keyword">if</span> secret_provider <span class="hljs-keyword">is</span> <span class="hljs-literal">None</span>:<br /> logger.warn(<span class="hljs-string">'No secret provider'</span>)<br /> <span class="hljs-keyword">else</span>:<br /> logger.info(<span class="hljs-string">"The secret provider is {0}"</span>.format(secret_provider))<br /></code></pre>
</span></div></div><div id="tab-group-2059-content-2062" class="tab-pane" data-group="group_2059" tabindex="-1"><div><span><p>Currently, the feature is not available in Go.</p>
</span></div></div></div></div>
<h2><a class="anchor" aria-hidden="true" id="state-storage"></a><a href="#state-storage" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>State storage</h2>
<p>Pulsar Functions use <a href="https://bookkeeper.apache.org">Apache BookKeeper</a> as a state storage interface. Pulsar installation, including the local standalone installation, includes deployment of BookKeeper bookies.</p>
<p>Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper <a href="https://docs.google.com/document/d/155xAwWv5IdOitHh1NVMEwCMGgB28M3FyMiQSxEpjE-Y/edit#heading=h.56rbh52koe3f">table service</a> to store the <code>State</code> for functions. For example, a <code>WordCount</code> function can store its <code>counters</code> state into BookKeeper table service via Pulsar Functions State API.</p>
<p>States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.</p>
<p>You can access states within Pulsar Java Functions using the <code>putState</code>, <code>putStateAsync</code>, <code>getState</code>, <code>getStateAsync</code>, <code>incrCounter</code>, <code>incrCounterAsync</code>, <code>getCounter</code>, <code>getCounterAsync</code> and <code>deleteState</code> calls on the context object. You can access states within Pulsar Python Functions using the <code>putState</code>, <code>getState</code>, <code>incrCounter</code>, <code>getCounter</code> and <code>deleteState</code> calls on the context object. You can also manage states using the <a href="#query-state">querystate</a> and <a href="#putstate">putstate</a> options to <code>pulsar-admin functions</code>.</p>
<blockquote>
<p>Note<br>
State storage is not available in Go.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="api"></a><a href="#api" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>API</h3>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-2063-tab-2064" class="nav-link active" data-group="group_2063" data-tab="tab-group-2063-content-2064">Java</div><div id="tab-group-2063-tab-2065" class="nav-link" data-group="group_2063" data-tab="tab-group-2063-content-2065">Python</div></div><div class="tab-content"><div id="tab-group-2063-content-2064" class="tab-pane active" data-group="group_2063" tabindex="-1"><div><span><p>Currently Pulsar Functions expose the following APIs for mutating and accessing State. These APIs are available in the <a href="/docs/en/2.9.2/functions-develop#context">Context</a> object when you are using Java SDK functions.</p>
<h4><a class="anchor" aria-hidden="true" id="incrcounter"></a><a href="#incrcounter" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>incrCounter</h4>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**<br /> * Increment the builtin distributed counter referred by key<br /> * <span class="hljs-doctag">@param</span> key The name of the key<br /> * <span class="hljs-doctag">@param</span> amount The amount to be incremented<br /> */</span><br /> <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">incrCounter</span><span class="hljs-params">(String key, <span class="hljs-keyword">long</span> amount)</span></span>;<br /></code></pre>
<p>The application can use <code>incrCounter</code> to change the counter of a given <code>key</code> by the given <code>amount</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="incrcounterasync"></a><a href="#incrcounterasync" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>incrCounterAsync</h4>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**<br /> * Increment the builtin distributed counter referred by key<br /> * but dont wait for the completion of the increment operation<br /> *<br /> * <span class="hljs-doctag">@param</span> key The name of the key<br /> * <span class="hljs-doctag">@param</span> amount The amount to be incremented<br /> */</span><br /> <span class="hljs-function">CompletableFuture&lt;Void&gt; <span class="hljs-title">incrCounterAsync</span><span class="hljs-params">(String key, <span class="hljs-keyword">long</span> amount)</span></span>;<br /></code></pre>
<p>The application can use <code>incrCounterAsync</code> to asynchronously change the counter of a given <code>key</code> by the given <code>amount</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="getcounter"></a><a href="#getcounter" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>getCounter</h4>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**<br /> * Retrieve the counter value for the key.<br /> *<br /> * <span class="hljs-doctag">@param</span> key name of the key<br /> * <span class="hljs-doctag">@return</span> the amount of the counter value for this key<br /> */</span><br /> <span class="hljs-function"><span class="hljs-keyword">long</span> <span class="hljs-title">getCounter</span><span class="hljs-params">(String key)</span></span>;<br /></code></pre>
<p>The application can use <code>getCounter</code> to retrieve the counter of a given <code>key</code> mutated by <code>incrCounter</code>.</p>
<p>Except the <code>counter</code> API, Pulsar also exposes a general key/value API for functions to store
general key/value state.</p>
<h4><a class="anchor" aria-hidden="true" id="getcounterasync"></a><a href="#getcounterasync" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>getCounterAsync</h4>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**<br /> * Retrieve the counter value for the key, but don't wait<br /> * for the operation to be completed<br /> *<br /> * <span class="hljs-doctag">@param</span> key name of the key<br /> * <span class="hljs-doctag">@return</span> the amount of the counter value for this key<br /> */</span><br /> <span class="hljs-function">CompletableFuture&lt;Long&gt; <span class="hljs-title">getCounterAsync</span><span class="hljs-params">(String key)</span></span>;<br /></code></pre>
<p>The application can use <code>getCounterAsync</code> to asynchronously retrieve the counter of a given <code>key</code> mutated by <code>incrCounterAsync</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="putstate"></a><a href="#putstate" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>putState</h4>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**<br /> * Update the state value for the key.<br /> *<br /> * <span class="hljs-doctag">@param</span> key name of the key<br /> * <span class="hljs-doctag">@param</span> value state value of the key<br /> */</span><br /> <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">putState</span><span class="hljs-params">(String key, ByteBuffer value)</span></span>;<br /></code></pre>
<h4><a class="anchor" aria-hidden="true" id="putstateasync"></a><a href="#putstateasync" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>putStateAsync</h4>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**<br /> * Update the state value for the key, but don't wait for the operation to be completed<br /> *<br /> * <span class="hljs-doctag">@param</span> key name of the key<br /> * <span class="hljs-doctag">@param</span> value state value of the key<br /> */</span><br /> <span class="hljs-function">CompletableFuture&lt;Void&gt; <span class="hljs-title">putStateAsync</span><span class="hljs-params">(String key, ByteBuffer value)</span></span>;<br /></code></pre>
<p>The application can use <code>putStateAsync</code> to asynchronously update the state of a given <code>key</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="getstate"></a><a href="#getstate" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>getState</h4>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**<br /> * Retrieve the state value for the key.<br /> *<br /> * <span class="hljs-doctag">@param</span> key name of the key<br /> * <span class="hljs-doctag">@return</span> the state value for the key.<br /> */</span><br /> <span class="hljs-function">ByteBuffer <span class="hljs-title">getState</span><span class="hljs-params">(String key)</span></span>;<br /></code></pre>
<h4><a class="anchor" aria-hidden="true" id="getstateasync"></a><a href="#getstateasync" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>getStateAsync</h4>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**<br /> * Retrieve the state value for the key, but don't wait for the operation to be completed<br /> *<br /> * <span class="hljs-doctag">@param</span> key name of the key<br /> * <span class="hljs-doctag">@return</span> the state value for the key.<br /> */</span><br /> <span class="hljs-function">CompletableFuture&lt;ByteBuffer&gt; <span class="hljs-title">getStateAsync</span><span class="hljs-params">(String key)</span></span>;<br /></code></pre>
<p>The application can use <code>getStateAsync</code> to asynchronously retrieve the state of a given <code>key</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="deletestate"></a><a href="#deletestate" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>deleteState</h4>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**<br /> * Delete the state value for the key.<br /> *<br /> * <span class="hljs-doctag">@param</span> key name of the key<br /> */</span><br /></code></pre>
<p>Counters and binary values share the same keyspace, so this deletes either type.</p>
</span></div></div><div id="tab-group-2063-content-2065" class="tab-pane" data-group="group_2063" tabindex="-1"><div><span><p>Currently Pulsar Functions expose the following APIs for mutating and accessing State. These APIs are available in the <a href="#context">Context</a> object when you are using Python SDK functions.</p>
<h4><a class="anchor" aria-hidden="true" id="incr_counter"></a><a href="#incr_counter" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>incr_counter</h4>
<pre><code class="hljs css language-python"> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">incr_counter</span><span class="hljs-params">(self, key, amount)</span>:</span><br /> <span class="hljs-string">"""incr the counter of a given key in the managed state"""</span><br /></code></pre>
<p>Application can use <code>incr_counter</code> to change the counter of a given <code>key</code> by the given <code>amount</code>.
If the <code>key</code> does not exist, a new key is created.</p>
<h4><a class="anchor" aria-hidden="true" id="get_counter"></a><a href="#get_counter" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>get_counter</h4>
<pre><code class="hljs css language-python"> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_counter</span><span class="hljs-params">(self, key)</span>:</span><br /> <span class="hljs-string">"""get the counter of a given key in the managed state"""</span><br /></code></pre>
<p>Application can use <code>get_counter</code> to retrieve the counter of a given <code>key</code> mutated by <code>incrCounter</code>.</p>
<p>Except the <code>counter</code> API, Pulsar also exposes a general key/value API for functions to store
general key/value state.</p>
<h4><a class="anchor" aria-hidden="true" id="put_state"></a><a href="#put_state" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>put_state</h4>
<pre><code class="hljs css language-python"> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">put_state</span><span class="hljs-params">(self, key, value)</span>:</span><br /> <span class="hljs-string">"""update the value of a given key in the managed state"""</span><br /></code></pre>
<p>The key is a string, and the value is arbitrary binary data.</p>
<h4><a class="anchor" aria-hidden="true" id="get_state"></a><a href="#get_state" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>get_state</h4>
<pre><code class="hljs css language-python"> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_state</span><span class="hljs-params">(self, key)</span>:</span><br /> <span class="hljs-string">"""get the value of a given key in the managed state"""</span><br /></code></pre>
<h4><a class="anchor" aria-hidden="true" id="del_counter"></a><a href="#del_counter" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>del_counter</h4>
<pre><code class="hljs css language-python"> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">del_counter</span><span class="hljs-params">(self, key)</span>:</span><br /> <span class="hljs-string">"""delete the counter of a given key in the managed state"""</span><br /></code></pre>
<p>Counters and binary values share the same keyspace, so this deletes either type.</p>
</span></div></div></div></div>
<h3><a class="anchor" aria-hidden="true" id="query-state"></a><a href="#query-state" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Query State</h3>
<p>A Pulsar Function can use the <a href="#api">State API</a> for storing state into Pulsar's state storage
and retrieving state back from Pulsar's state storage. Additionally Pulsar also provides
CLI commands for querying its state.</p>
<pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> bin/pulsar-admin <span class="hljs-built_in">functions</span> querystate \</span>
--tenant &lt;tenant&gt; \
--namespace &lt;namespace&gt; \
--name &lt;function-name&gt; \
--state-storage-url &lt;bookkeeper-service-url&gt; \
--key &lt;state-key&gt; \
[---watch]
</code></pre>
<p>If <code>--watch</code> is specified, the CLI will watch the value of the provided <code>state-key</code>.</p>
<h3><a class="anchor" aria-hidden="true" id="example-1"></a><a href="#example-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example</h3>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-2066-tab-2067" class="nav-link active" data-group="group_2066" data-tab="tab-group-2066-content-2067">Java</div><div id="tab-group-2066-tab-2068" class="nav-link" data-group="group_2066" data-tab="tab-group-2066-content-2068">Python</div></div><div class="tab-content"><div id="tab-group-2066-content-2067" class="tab-pane active" data-group="group_2066" tabindex="-1"><div><span><p><a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java"><code>WordCountFunction</code></a>
is a very good example
demonstrating on how Application can easily store <code>state</code> in Pulsar Functions.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;<br /><br /><span class="hljs-keyword">import</span> java.util.Arrays;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">WordCountFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{<br /> <span class="hljs-meta">@Override</span><br /> <span class="hljs-function"><span class="hljs-keyword">public</span> Void <span class="hljs-title">process</span><span class="hljs-params">(String input, Context context)</span> <span class="hljs-keyword">throws</span> Exception </span>{<br /> Arrays.asList(input.split(<span class="hljs-string">"\\."</span>)).forEach(word -&gt; context.incrCounter(word, <span class="hljs-number">1</span>));<br /> <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;<br /> }<br />}<br /></code></pre>
<p>The logic of this <code>WordCount</code> function is pretty simple and straightforward:</p>
<ol>
<li>The function first splits the received <code>String</code> into multiple words using regex <code>\\.</code>.</li>
<li>For each <code>word</code>, the function increments the corresponding <code>counter</code> by 1 (via <code>incrCounter(key, amount)</code>).</li>
</ol>
</span></div></div><div id="tab-group-2066-content-2068" class="tab-pane" data-group="group_2066" tabindex="-1"><div><span><pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">WordCount</span><span class="hljs-params">(Function)</span>:</span><br /> <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, item, context)</span>:</span><br /> <span class="hljs-keyword">for</span> word <span class="hljs-keyword">in</span> item.split():<br /> context.incr_counter(word, <span class="hljs-number">1</span>)<br /></code></pre>
<p>The logic of this <code>WordCount</code> function is pretty simple and straightforward:</p>
<ol>
<li>The function first splits the received string into multiple words on space.</li>
<li>For each <code>word</code>, the function increments the corresponding <code>counter</code> by 1 (via <code>incr_counter(key, amount)</code>).</li>
</ol>
</span></div></div></div></div>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.9.2/functions-worker"><span class="arrow-prev"></span><span>Setup: Pulsar Functions Worker</span></a><a class="docs-next button" href="/docs/en/2.9.2/functions-package"><span>How-to: Package</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#available-apis">Available APIs</a></li><li><a href="#schema-registry">Schema registry</a></li><li><a href="#serde">SerDe</a><ul class="toc-headings"><li><a href="#example">Example</a></li></ul></li><li><a href="#context">Context</a><ul class="toc-headings"><li><a href="#user-config">User config</a></li><li><a href="#logger">Logger</a></li><li><a href="#pulsar-admin">Pulsar admin</a></li></ul></li><li><a href="#metrics">Metrics</a></li><li><a href="#security">Security</a></li><li><a href="#state-storage">State storage</a><ul class="toc-headings"><li><a href="#api">API</a></li><li><a href="#query-state">Query State</a></li><li><a href="#example-1">Example</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>