<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Develop Pulsar Functions · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="This tutorial walks you through how to develop Pulsar Functions."/><meta name="docsearch:version" content="2.4.1"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Develop Pulsar Functions · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="This tutorial walks you through how to develop Pulsar Functions."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.4.1</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.4.1/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.4.1/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.4.1/functions-develop">日本語</a></li><li><a href="/docs/fr/2.4.1/functions-develop">Français</a></li><li><a href="/docs/ko/2.4.1/functions-develop">한국어</a></li><li><a href="/docs/zh-CN/2.4.1/functions-develop">中文</a></li><li><a href="/docs/zh-TW/2.4.1/functions-develop">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
        const languagesMenuItem = document.getElementById("languages-menu");
        const languagesDropDown = document.getElementById("languages-dropdown");
        languagesMenuItem.addEventListener("click", function(event) {
          event.preventDefault();

          if (languagesDropDown.className == "hide") {
            languagesDropDown.className = "visible";
          } else {
            languagesDropDown.className = "hide";
          }
        });
      </script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i>›</i><span>Pulsar Functions</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/client-libraries">Use Pulsar with client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/concepts-schema-registry">Schema Registry</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.4.1/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/io-managing">Managing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/io-connectors">Builtin Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/io-develop">Developing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/io-cdc">CDC Connector</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/sql-getting-started">Get Started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/sql-deployment-configurations">Deployment and Configuration</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/administration-dashboard">Dashboard</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/administration-upgrade">Upgrade</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/security-token-client">Client Authentication using tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/security-token-admin">Token authentication admin</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/security-extending">Extending</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/client-libraries-websocket">WebSocket</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/admin-api-schemas">Schemas</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/admin-api-functions">Functions</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/reference-connector-admin">Connector Admin CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.1/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
            var coll = document.getElementsByClassName('collapsible');
            var checkActiveCategory = true;
            for (var i = 0; i < coll.length; i++) {
              var links = coll[i].nextElementSibling.getElementsByTagName('*');
              if (checkActiveCategory){
                for (var j = 0; j < links.length; j++) {
                  if (links[j].classList.contains('navListItemActive')){
                    coll[i].nextElementSibling.classList.toggle('hide');
                    coll[i].childNodes[1].classList.toggle('rotate');
                    checkActiveCategory = false;
                    break;
                  }
                }
              }

              coll[i].addEventListener('click', function() {
                var arrow = this.childNodes[1];
                arrow.classList.toggle('rotate');
                var content = this.nextElementSibling;
                content.classList.toggle('hide');
              });
            }

            document.addEventListener('DOMContentLoaded', function() {
              createToggler('#navToggler', '#docsNav', 'docsSliderActive');
              createToggler('#tocToggler', 'body', 'tocActive');

              var headings = document.querySelector('.toc-headings');
              headings && headings.addEventListener('click', function(event) {
                var el = event.target;
                while(el !== headings){
                  if (el.tagName === 'A') {
                    document.body.classList.remove('tocActive');
                    break;
                  } else{
                    el = el.parentNode;
                  }
                }
              }, false);

              function createToggler(togglerSelector, targetSelector, className) {
                var toggler = document.querySelector(togglerSelector);
                var target = document.querySelector(targetSelector);

                if (!toggler) {
                  return;
                }

                toggler.onclick = function(event) {
                  event.preventDefault();

                  target.classList.toggle(className);
                };
              }
            });
        </script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/functions-develop.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Develop Pulsar Functions</h1></header><article><div><span><p>This tutorial walks you through how to develop Pulsar Functions.</p>
<h2><a class="anchor" aria-hidden="true" id="available-apis"></a><a href="#available-apis" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Available APIs</h2>
<p>In Java and Python, you have two options to write Pulsar Functions. In Go, you can use Pulsar Functions SDK for Go.</p>
<table>
<thead>
<tr><th style="text-align:left">Interface</th><th style="text-align:left">Description</th><th style="text-align:left">Use cases</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left">Language-native interface</td><td style="text-align:left">No Pulsar-specific libraries or special dependencies required (only core libraries from Java/Python).</td><td style="text-align:left">Functions that do not require access to the function <a href="#context">context</a>.</td></tr>
<tr><td style="text-align:left">Pulsar Function SDK for Java/Python/Go</td><td style="text-align:left">Pulsar-specific libraries that provide a range of functionality not provided by &quot;native&quot; interfaces.</td><td style="text-align:left">Functions that require access to the function <a href="#context">context</a>.</td></tr>
</tbody>
</table>
<p>The language-native function, which adds an exclamation point to all incoming strings and publishes the resulting string to a topic, has no external dependencies. The following example is language-native function.</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-10527-tab-10528" class="nav-link active" data-group="group_10527" data-tab="tab-group-10527-content-10528">Java</div><div id="tab-group-10527-tab-10529" class="nav-link" data-group="group_10527" data-tab="tab-group-10527-content-10529">Python</div></div><div class="tab-content"><div id="tab-group-10527-content-10528" class="tab-pane active" data-group="group_10527" tabindex="-1"><div><span><pre><code class="hljs css language-Java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">JavaNativeExclamationFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">String</span>&gt; </span>{<br />    <span class="hljs-meta">@Override</span><br />    <span class="hljs-function"><span class="hljs-keyword">public</span> String <span class="hljs-title">apply</span><span class="hljs-params">(String input)</span> </span>{<br />        <span class="hljs-keyword">return</span> String.format(<span class="hljs-string">"%s!"</span>, input);<br />    }<br />}<br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeExclamationFunction.java">here</a>.</p>
</span></div></div><div id="tab-group-10527-content-10529" class="tab-pane" data-group="group_10527" tabindex="-1"><div><span><pre><code class="hljs css language-python"><span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(input)</span>:</span><br />    <span class="hljs-keyword">return</span> <span class="hljs-string">"{}!"</span>.format(input)<br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/native_exclamation_function.py">here</a>.</p>
<blockquote>
<p>Note
You can write Pulsar Functions in python2 or python3. However, Pulsar only looks for <code>python</code> as the interpreter.</p>
<p>If you're running Pulsar Functions on an Ubuntu system that only supports python3, you might fail to start the functions. In this case, you can create a symlink. Your system will fail if you subsequently install any other package that depends on Python 2.x. A solution is under development in <a href="https://github.com/apache/pulsar/issues/5518">Issue 5518</a>.</p>
<pre><code class="hljs css language-bash">sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 10<br /></code></pre>
</blockquote>
</span></div></div></div></div>
<p>The following example uses Pulsar Functions SDK.</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-10530-tab-10531" class="nav-link active" data-group="group_10530" data-tab="tab-group-10530-content-10531">Java</div><div id="tab-group-10530-tab-10532" class="nav-link" data-group="group_10530" data-tab="tab-group-10530-content-10532">Python</div><div id="tab-group-10530-tab-10533" class="nav-link" data-group="group_10530" data-tab="tab-group-10530-content-10533">Go</div></div><div class="tab-content"><div id="tab-group-10530-content-10531" class="tab-pane active" data-group="group_10530" tabindex="-1"><div><span><pre><code class="hljs css language-Java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ExclamationFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">String</span>&gt; </span>{<br />    <span class="hljs-meta">@Override</span><br />    <span class="hljs-function"><span class="hljs-keyword">public</span> String <span class="hljs-title">process</span><span class="hljs-params">(String input, Context context)</span> </span>{<br />        <span class="hljs-keyword">return</span> String.format(<span class="hljs-string">"%s!"</span>, input);<br />    }<br />}<br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java">here</a>.</p>
</span></div></div><div id="tab-group-10530-content-10532" class="tab-pane" data-group="group_10530" tabindex="-1"><div><span><pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ExclamationFunction</span><span class="hljs-params">(Function)</span>:</span><br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">__init__</span><span class="hljs-params">(self)</span>:</span><br />    <span class="hljs-keyword">pass</span><br /><br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span><br />    <span class="hljs-keyword">return</span> input + <span class="hljs-string">'!'</span><br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/exclamation_function.py">here</a>.</p>
</span></div></div><div id="tab-group-10530-content-10533" class="tab-pane" data-group="group_10530" tabindex="-1"><div><span><pre><code class="hljs css language-Go"><span class="hljs-keyword">package</span> main<br /><br /><span class="hljs-keyword">import</span> (<br />    <span class="hljs-string">"context"</span><br />    <span class="hljs-string">"fmt"</span><br /><br />    <span class="hljs-string">"github.com/apache/pulsar/pulsar-function-go/pf"</span><br />)<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">HandleRequest</span><span class="hljs-params">(ctx context.Context, in []<span class="hljs-keyword">byte</span>)</span> <span class="hljs-title">error</span></span>{<br />    fmt.Println(<span class="hljs-keyword">string</span>(in) + <span class="hljs-string">"!"</span>)<br />    <span class="hljs-keyword">return</span> <span class="hljs-literal">nil</span><br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">main</span><span class="hljs-params">()</span></span> {<br />    pf.Start(HandleRequest)<br />}<br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/77cf09eafa4f1626a53a1fe2e65dd25f377c1127/pulsar-function-go/examples/inputFunc/inputFunc.go#L20-L36">here</a>.</p>
</span></div></div></div></div>
<h2><a class="anchor" aria-hidden="true" id="schema-registry"></a><a href="#schema-registry" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Schema registry</h2>
<p>Pulsar has a built in schema registry and comes bundled with a variety of popular schema types(avro, json and protobuf). Pulsar Functions can leverage existing schema information from input topics and derive the input type. The schema registry applies for output topic as well.</p>
<h2><a class="anchor" aria-hidden="true" id="serde"></a><a href="#serde" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>SerDe</h2>
<p>SerDe stands for <strong>Ser</strong>ialization and <strong>De</strong>serialization. Pulsar Functions uses SerDe when publishing data to and consuming data from Pulsar topics. How SerDe works by default depends on the language you use for a particular function.</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-10534-tab-10535" class="nav-link active" data-group="group_10534" data-tab="tab-group-10534-content-10535">Java</div><div id="tab-group-10534-tab-10536" class="nav-link" data-group="group_10534" data-tab="tab-group-10534-content-10536">Python</div></div><div class="tab-content"><div id="tab-group-10534-content-10535" class="tab-pane active" data-group="group_10534" tabindex="-1"><div><span><p>When you write Pulsar Functions in Java, the following basic Java types are built in and supported by default:</p>
<ul>
<li><code>String</code></li>
<li><code>Double</code></li>
<li><code>Integer</code></li>
<li><code>Float</code></li>
<li><code>Long</code></li>
<li><code>Short</code></li>
<li><code>Byte</code></li>
</ul>
<p>To customize Java types, you need to implement the following interface.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">interface</span> <span class="hljs-title">SerDe</span>&lt;<span class="hljs-title">T</span>&gt; </span>{<br />    <span class="hljs-function">T <span class="hljs-title">deserialize</span><span class="hljs-params">(<span class="hljs-keyword">byte</span>[] input)</span></span>;<br />    <span class="hljs-keyword">byte</span>[] serialize(T input);<br />}<br /></code></pre>
</span></div></div><div id="tab-group-10534-content-10536" class="tab-pane" data-group="group_10534" tabindex="-1"><div><span><p>In Python, the default SerDe is identity, meaning that the type is serialized as whatever type the producer function returns.</p>
<p>You can specify the SerDe when <a href="/docs/en/2.4.1/functions-deploy#cluster-mode">creating</a> or <a href="/docs/en/2.4.1/functions-deploy#local-run-mode">running</a> functions.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br />  --tenant public \<br />  --namespace default \<br />  --name my_function \<br />  --py my_function.py \<br />  --classname my_function.MyFunction \<br />  --custom-serde-inputs <span class="hljs-string">'{"input-topic-1":"Serde1","input-topic-2":"Serde2"}'</span> \<br />  --output-serde-classname Serde3 \<br />  --output output-topic-1<br /></code></pre>
<p>This case contains two input topics: <code>input-topic-1</code> and <code>input-topic-2</code>, each of which is mapped to a different SerDe class (the map must be specified as a JSON string). The output topic, <code>output-topic-1</code>, uses the <code>Serde3</code> class for SerDe. At the moment, all Pulsar Functions logic, include processing function and SerDe classes, must be contained within a single Python file.</p>
<p>When using Pulsar Functions for Python, you have three SerDe options:</p>
<ol>
<li>You can use the <a href="https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L70"><code>IdentitySerde</code></a>, which leaves the data unchanged. The <code>IdentitySerDe</code> is the <strong>default</strong>. Creating or running a function without explicitly specifying SerDe means that this option is used.</li>
<li>You can use the <a href="https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L62"><code>PickleSerDe</code></a>, which uses Python <a href="https://docs.python.org/3/library/pickle.html"><code>pickle</code></a> for SerDe.</li>
<li>You can create a custom SerDe class by implementing the baseline <a href="https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L50"><code>SerDe</code></a> class, which has just two methods: <a href="https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L53"><code>serialize</code></a> for converting the object into bytes, and <a href="https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/functions/serde.py#L58"><code>deserialize</code></a> for converting bytes into an object of the required application-specific type.</li>
</ol>
<p>The table below shows when you should use each SerDe.</p>
<table>
<thead>
<tr><th style="text-align:left">SerDe option</th><th style="text-align:left">When to use</th></tr>
</thead>
<tbody>
<tr><td style="text-align:left"><code>IdentitySerde</code></td><td style="text-align:left">When you work with simple types like strings, Booleans, integers.</td></tr>
<tr><td style="text-align:left"><code>PickleSerDe</code></td><td style="text-align:left">When you work with complex, application-specific types and are comfortable with the &quot;best effort&quot; approach of <code>pickle</code>.</td></tr>
<tr><td style="text-align:left">Custom SerDe</td><td style="text-align:left">When you require explicit control over SerDe, potentially for performance or data compatibility purposes.</td></tr>
</tbody>
</table>
</span></div></div></div></div>
<h3><a class="anchor" aria-hidden="true" id="example"></a><a href="#example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example</h3>
<p>Imagine that you're writing Pulsar Functions that are processing tweet objects, you can refer to the following example of <code>Tweet</code> class.</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-10537-tab-10538" class="nav-link active" data-group="group_10537" data-tab="tab-group-10537-content-10538">Java</div><div id="tab-group-10537-tab-10539" class="nav-link" data-group="group_10537" data-tab="tab-group-10537-content-10539">Python</div></div><div class="tab-content"><div id="tab-group-10537-content-10538" class="tab-pane active" data-group="group_10537" tabindex="-1"><div><span><pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Tweet</span> </span>{<br />    <span class="hljs-keyword">private</span> String username;<br />    <span class="hljs-keyword">private</span> String tweetContent;<br /><br />    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-title">Tweet</span><span class="hljs-params">(String username, String tweetContent)</span> </span>{<br />        <span class="hljs-keyword">this</span>.username = username;<br />        <span class="hljs-keyword">this</span>.tweetContent = tweetContent;<br />    }<br /><br />    <span class="hljs-comment">// Standard setters and getters</span><br />}<br /></code></pre>
<p>To pass <code>Tweet</code> objects directly between Pulsar Functions, you need to provide a custom SerDe class. In the example below, <code>Tweet</code> objects are basically strings in which the username and tweet content are separated by a <code>|</code>.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">package</span> com.example.serde;<br /><br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.SerDe;<br /><br /><span class="hljs-keyword">import</span> java.util.regex.Pattern;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">TweetSerde</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">SerDe</span>&lt;<span class="hljs-title">Tweet</span>&gt; </span>{<br />    <span class="hljs-function"><span class="hljs-keyword">public</span> Tweet <span class="hljs-title">deserialize</span><span class="hljs-params">(<span class="hljs-keyword">byte</span>[] input)</span> </span>{<br />        String s = <span class="hljs-keyword">new</span> String(input);<br />        String[] fields = s.split(Pattern.quote(<span class="hljs-string">"|"</span>));<br />        <span class="hljs-keyword">return</span> <span class="hljs-keyword">new</span> Tweet(fields[<span class="hljs-number">0</span>], fields[<span class="hljs-number">1</span>]);<br />    }<br /><br />    <span class="hljs-keyword">public</span> <span class="hljs-keyword">byte</span>[] serialize(Tweet input) {<br />        <span class="hljs-keyword">return</span> <span class="hljs-string">"%s|%s"</span>.format(input.getUsername(), input.getTweetContent()).getBytes();<br />    }<br />}<br /></code></pre>
<p>To apply this customized SerDe to a particular Pulsar Function, you need to:</p>
<ul>
<li>Package the <code>Tweet</code> and <code>TweetSerde</code> classes into a JAR.</li>
<li>Specify a path to the JAR and SerDe class name when deploying the function.</li>
</ul>
<p>The following is an example of <a href="/docs/en/2.4.1/reference-pulsar-admin#create-1"><code>create</code></a> operation.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br />  --jar /path/to/your.jar \<br />  --output-serde-classname com.example.serde.TweetSerde \<br />  <span class="hljs-comment"># Other function attributes</span><br /></code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="custom-serde-classes-must-be-packaged-with-your-function-jars"></a><a href="#custom-serde-classes-must-be-packaged-with-your-function-jars" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Custom SerDe classes must be packaged with your function JARs</h4>
<p>Pulsar does not store your custom SerDe classes separately from your Pulsar Functions. So you need to include your SerDe classes in your function JARs. If not, Pulsar returns an error.</p>
</blockquote>
</span></div></div><div id="tab-group-10537-content-10539" class="tab-pane" data-group="group_10537" tabindex="-1"><div><span><pre><code class="hljs css language-python"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Tweet</span><span class="hljs-params">(object)</span>:</span><br />    <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">__init__</span><span class="hljs-params">(self, username, tweet_content)</span>:</span><br />        self.username = username<br />        self.tweet_content = tweet_content<br /></code></pre>
<p>In order to use this class in Pulsar Functions, you have two options:</p>
<ol>
<li>You can specify <code>PickleSerDe</code>, which applies the <a href="https://docs.python.org/3/library/pickle.html"><code>pickle</code></a> library SerDe.</li>
<li>You can create your own SerDe class. The following is an example.</li>
</ol>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> SerDe<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">TweetSerDe</span><span class="hljs-params">(SerDe)</span>:</span><br /><br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">serialize</span><span class="hljs-params">(self, input)</span>:</span><br />      <span class="hljs-keyword">return</span> bytes(<span class="hljs-string">"{0}|{1}"</span>.format(input.username, input.tweet_content))<br /><br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">deserialize</span><span class="hljs-params">(self, input_bytes)</span>:</span><br />      tweet_components = str(input_bytes).split(<span class="hljs-string">'|'</span>)<br />      <span class="hljs-keyword">return</span> Tweet(tweet_components[<span class="hljs-number">0</span>], tweet_componentsp[<span class="hljs-number">1</span>])<br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/master/pulsar-functions/python-examples/custom_object_function.py">here</a>.</p>
</span></div></div></div></div>
<p>In both languages, however, you can write custom SerDe logic for more complex, application-specific types.</p>
<h2><a class="anchor" aria-hidden="true" id="context"></a><a href="#context" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Context</h2>
<p>Java, Python and Go SDKs provide access to a <strong>context object</strong> that can be used by a function. This context object provides a wide variety of information and functionality to the function.</p>
<ul>
<li>The name and ID of a Pulsar Function.</li>
<li>The message ID of each message. Each Pulsar message is automatically assigned with an ID.</li>
<li>The key, event time, properties and partition key of each message.</li>
<li>The name of the topic to which the message is sent.</li>
<li>The names of all input topics as well as the output topic associated with the function.</li>
<li>The name of the class used for <a href="#serde">SerDe</a>.</li>
<li>The <a href="/docs/en/2.4.1/reference-terminology#tenant">tenant</a> and namespace associated with the function.</li>
<li>The ID of the Pulsar Functions instance running the function.</li>
<li>The version of the function.</li>
<li>The <a href="/docs/en/2.4.1/functions-develop#logger">logger object</a> used by the function, which can be used to create function log messages.</li>
<li>Access to arbitrary <a href="#user-config">user configuration</a> values supplied via the CLI.</li>
<li>An interface for recording <a href="#metrics">metrics</a>.</li>
<li>An interface for storing and retrieving state in <a href="#state-storage">state storage</a>.</li>
<li>A function to publish new messages onto arbitrary topics.</li>
<li>A function to ack the message being processed (if auto-ack is disabled).</li>
</ul>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-10540-tab-10541" class="nav-link active" data-group="group_10540" data-tab="tab-group-10540-content-10541">Java</div><div id="tab-group-10540-tab-10542" class="nav-link" data-group="group_10540" data-tab="tab-group-10540-content-10542">Python</div><div id="tab-group-10540-tab-10543" class="nav-link" data-group="group_10540" data-tab="tab-group-10540-content-10543">Go</div></div><div class="tab-content"><div id="tab-group-10540-content-10541" class="tab-pane active" data-group="group_10540" tabindex="-1"><div><span><p>The <a href="https://github.com/apache/pulsar/blob/master/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java">Context</a> interface provides a number of methods that you can use to access the function <a href="#context">context</a>. The various method signatures for the <code>Context</code> interface are listed as follows.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">interface</span> <span class="hljs-title">Context</span> </span>{<br />    Record&lt;?&gt; getCurrentRecord();<br />    <span class="hljs-function">Collection&lt;String&gt; <span class="hljs-title">getInputTopics</span><span class="hljs-params">()</span></span>;<br />    <span class="hljs-function">String <span class="hljs-title">getOutputTopic</span><span class="hljs-params">()</span></span>;<br />    <span class="hljs-function">String <span class="hljs-title">getOutputSchemaType</span><span class="hljs-params">()</span></span>;<br />    <span class="hljs-function">String <span class="hljs-title">getTenant</span><span class="hljs-params">()</span></span>;<br />    <span class="hljs-function">String <span class="hljs-title">getNamespace</span><span class="hljs-params">()</span></span>;<br />    <span class="hljs-function">String <span class="hljs-title">getFunctionName</span><span class="hljs-params">()</span></span>;<br />    <span class="hljs-function">String <span class="hljs-title">getFunctionId</span><span class="hljs-params">()</span></span>;<br />    <span class="hljs-function">String <span class="hljs-title">getInstanceId</span><span class="hljs-params">()</span></span>;<br />    <span class="hljs-function">String <span class="hljs-title">getFunctionVersion</span><span class="hljs-params">()</span></span>;<br />    <span class="hljs-function">Logger <span class="hljs-title">getLogger</span><span class="hljs-params">()</span></span>;<br />    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">incrCounter</span><span class="hljs-params">(String key, <span class="hljs-keyword">long</span> amount)</span></span>;<br />    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">incrCounterAsync</span><span class="hljs-params">(String key, <span class="hljs-keyword">long</span> amount)</span></span>;<br />    <span class="hljs-function"><span class="hljs-keyword">long</span> <span class="hljs-title">getCounter</span><span class="hljs-params">(String key)</span></span>;<br />    <span class="hljs-function"><span class="hljs-keyword">long</span> <span class="hljs-title">getCounterAsync</span><span class="hljs-params">(String key)</span></span>;<br />    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">putState</span><span class="hljs-params">(String key, ByteBuffer value)</span></span>;<br />    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">putStateAsync</span><span class="hljs-params">(String key, ByteBuffer value)</span></span>;<br />    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">deleteState</span><span class="hljs-params">(String key)</span></span>;<br />    <span class="hljs-function">ByteBuffer <span class="hljs-title">getState</span><span class="hljs-params">(String key)</span></span>;<br />    <span class="hljs-function">ByteBuffer <span class="hljs-title">getStateAsync</span><span class="hljs-params">(String key)</span></span>;<br />    <span class="hljs-function">Map&lt;String, Object&gt; <span class="hljs-title">getUserConfigMap</span><span class="hljs-params">()</span></span>;<br />    <span class="hljs-function">Optional&lt;Object&gt; <span class="hljs-title">getUserConfigValue</span><span class="hljs-params">(String key)</span></span>;<br />    <span class="hljs-function">Object <span class="hljs-title">getUserConfigValueOrDefault</span><span class="hljs-params">(String key, Object defaultValue)</span></span>;<br />    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">recordMetric</span><span class="hljs-params">(String metricName, <span class="hljs-keyword">double</span> value)</span></span>;<br />    &lt;O&gt; <span class="hljs-function">CompletableFuture&lt;Void&gt; <span class="hljs-title">publish</span><span class="hljs-params">(String topicName, O object, String schemaOrSerdeClassName)</span></span>;<br />    &lt;O&gt; <span class="hljs-function">CompletableFuture&lt;Void&gt; <span class="hljs-title">publish</span><span class="hljs-params">(String topicName, O object)</span></span>;<br />    &lt;O&gt; <span class="hljs-function">TypedMessageBuilder&lt;O&gt; <span class="hljs-title">newOutputMessage</span><span class="hljs-params">(String topicName, Schema&lt;O&gt; schema)</span> <span class="hljs-keyword">throws</span> PulsarClientException</span>;<br />}<br /></code></pre>
<p>The following example uses several methods available via the <code>Context</code> object.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;<br /><span class="hljs-keyword">import</span> org.slf4j.Logger;<br /><br /><span class="hljs-keyword">import</span> java.util.stream.Collectors;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ContextFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{<br />    <span class="hljs-function"><span class="hljs-keyword">public</span> Void <span class="hljs-title">process</span><span class="hljs-params">(String input, Context context)</span> </span>{<br />        Logger LOG = context.getLogger();<br />        String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(<span class="hljs-string">", "</span>));<br />        String functionName = context.getFunctionName();<br /><br />        String logMessage = String.format(<span class="hljs-string">"A message with a value of \"%s\" has arrived on one of the following topics: %s\n"</span>,<br />                input,<br />                inputTopics);<br /><br />        LOG.info(logMessage);<br /><br />        String metricName = String.format(<span class="hljs-string">"function-%s-messages-received"</span>, functionName);<br />        context.recordMetric(metricName, <span class="hljs-number">1</span>);<br /><br />        <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;<br />    }<br />}<br /></code></pre>
</span></div></div><div id="tab-group-10540-content-10542" class="tab-pane" data-group="group_10540" tabindex="-1"><div><span><pre><code class="hljs"><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ContextImpl</span>(<span class="hljs-title">pulsar</span>.<span class="hljs-title">Context</span>):</span><br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_message_id</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_message_key</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_message_eventtime</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_message_properties</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_current_message_topic_name</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_partition_key</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_function_name</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_function_tenant</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_function_namespace</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_function_id</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_instance_id</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_function_version</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_logger</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_user_config_value</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, key)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_user_config_map</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">record_metric</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, metric_name, metric_value)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_input_topics</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_output_topic</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_output_serde_class_name</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">publish</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, topic_name, message, serde_class_name=<span class="hljs-string">"serde.IdentitySerDe"</span>,<br />              properties=None, compression_type=None, callback=None, message_conf=None)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">ack</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, msgid, topic)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_and_reset_metrics</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">reset_metrics</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_metrics</span><span class="hljs-params">(<span class="hljs-keyword">self</span>)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">incr_counter</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, key, amount)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_counter</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, key)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">del_counter</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, key)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">put_state</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, key, value)</span></span>:<br />    ...<br />  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_state</span><span class="hljs-params">(<span class="hljs-keyword">self</span>, key)</span></span>:<br />    ...<br /></code></pre>
</span></div></div><div id="tab-group-10540-content-10543" class="tab-pane" data-group="group_10540" tabindex="-1"><div><span><pre><code class="hljs"><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetInstanceID</span>() int {<br />    <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.instanceID<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetInputTopics</span>() []string {<br />    <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.inputTopics<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetOutputTopic</span>() string {<br />    <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.funcDetails.<span class="hljs-type">GetSink</span>().<span class="hljs-type">Topic</span><br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetFuncTenant</span>() string {<br />    <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.funcDetails.<span class="hljs-type">Tenant</span><br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetFuncName</span>() string {<br />    <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.funcDetails.<span class="hljs-type">Name</span><br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetFuncNamespace</span>() string {<br />    <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.funcDetails.<span class="hljs-type">Namespace</span><br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetFuncID</span>() string {<br />    <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.funcID<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetFuncVersion</span>() string {<br />    <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.instanceConf.funcVersion<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetUserConfValue</span>(key string) interface{} {<br />    <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.userConfigs[key]<br />}<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-params">(<span class="hljs-built_in">c</span> *FunctionContext)</span></span> <span class="hljs-type">GetUserConfMap</span>() <span class="hljs-built_in">map</span>[string]interface{} {<br />    <span class="hljs-keyword">return</span> <span class="hljs-built_in">c</span>.userConfigs<br />}<br /></code></pre>
<p>The following example uses several methods available via the <code>Context</code> object.</p>
<pre><code class="hljs"><span class="hljs-keyword">import</span> (<br />    <span class="hljs-string">"context"</span><br />    <span class="hljs-string">"fmt"</span><br /><br />    <span class="hljs-string">"github.com/apache/pulsar/pulsar-function-go/pf"</span><br />)<br /><br /><span class="hljs-function"><span class="hljs-keyword">func</span> <span class="hljs-title">contextFunc</span><span class="hljs-params">(ctx context.Context)</span></span> {<br />    <span class="hljs-keyword">if</span> fc, ok := pf.FromContext(ctx); ok {<br />        fmt.Printf(<span class="hljs-string">"function ID is:%s, "</span>, fc.GetFuncID())<br />        fmt.Printf(<span class="hljs-string">"function version is:%s\n"</span>, fc.GetFuncVersion())<br />    }<br />}<br /></code></pre>
<p>For complete code, see <a href="https://github.com/apache/pulsar/blob/77cf09eafa4f1626a53a1fe2e65dd25f377c1127/pulsar-function-go/examples/contextFunc/contextFunc.go#L29-L34">here</a>.</p>
</span></div></div></div></div>
<h3><a class="anchor" aria-hidden="true" id="user-config"></a><a href="#user-config" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>User config</h3>
<p>When you run or update Pulsar Functions created using SDK, you can pass arbitrary key/values to them with the command line with the <code>--userConfig</code> flag. Key/values must be specified as JSON. The following function creation command passes a user configured key/value to a function.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \
  --name word-filter \
  <span class="hljs-comment"># Other function configs</span>
  --user-config <span class="hljs-string">'{"forbidden-word":"rosebud"}'</span>
</code></pre>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-10544-tab-10545" class="nav-link active" data-group="group_10544" data-tab="tab-group-10544-content-10545">Java</div><div id="tab-group-10544-tab-10546" class="nav-link" data-group="group_10544" data-tab="tab-group-10544-content-10546">Python</div></div><div class="tab-content"><div id="tab-group-10544-content-10545" class="tab-pane active" data-group="group_10544" tabindex="-1"><div><span><p>The Java SDK <a href="#context"><code>Context</code></a> object enables you to access key/value pairs provided to Pulsar Functions via the command line (as JSON). The following example passes a key/value pair.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br />  <span class="hljs-comment"># Other function configs</span><br />  --user-config <span class="hljs-string">'{"word-of-the-day":"verdure"}'</span><br /></code></pre>
<p>To access that value in a Java function:</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;<br /><span class="hljs-keyword">import</span> org.slf4j.Logger;<br /><br /><span class="hljs-keyword">import</span> java.util.Optional;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">UserConfigFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{<br />    <span class="hljs-meta">@Override</span><br />    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">apply</span><span class="hljs-params">(String input, Context context)</span> </span>{<br />        Logger LOG = context.getLogger();<br />        Optional&lt;String&gt; wotd = context.getUserConfigValue(<span class="hljs-string">"word-of-the-day"</span>);<br />        <span class="hljs-keyword">if</span> (wotd.isPresent()) {<br />            LOG.info(<span class="hljs-string">"The word of the day is {}"</span>, wotd);<br />        } <span class="hljs-keyword">else</span> {<br />            LOG.warn(<span class="hljs-string">"No word of the day provided"</span>);<br />        }<br />        <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;<br />    }<br />}<br /></code></pre>
<p>The <code>UserConfigFunction</code> function will log the string <code>&quot;The word of the day is verdure&quot;</code> every time the function is invoked (which means every time a message arrives). The <code>word-of-the-day</code> user config will be changed only when the function is updated with a new config value via the command line.</p>
<p>You can also access the entire user config map or set a default value in case no value is present:</p>
<pre><code class="hljs css language-java"><span class="hljs-comment">// Get the whole config map</span><br />Map&lt;String, String&gt; allConfigs = context.getUserConfigMap();<br /><br /><span class="hljs-comment">// Get value or resort to default</span><br />String wotd = context.getUserConfigValueOrDefault(<span class="hljs-string">"word-of-the-day"</span>, <span class="hljs-string">"perspicacious"</span>);<br /></code></pre>
<blockquote>
<p>For all key/value pairs passed to Java functions, both the key <em>and</em> the value are <code>String</code>. To set the value to be a different type, you need to deserialize from the <code>String</code> type.</p>
</blockquote>
</span></div></div><div id="tab-group-10544-content-10546" class="tab-pane" data-group="group_10544" tabindex="-1"><div><span><p>In Python function, you can access the configuration value like this.</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">WordFilter</span><span class="hljs-params">(Function)</span>:</span><br />    <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, context, input)</span>:</span><br />        forbidden_word = context.user_config()[<span class="hljs-string">"forbidden-word"</span>]<br /><br />        <span class="hljs-comment"># Don't publish the message if it contains the user-supplied</span><br />        <span class="hljs-comment"># forbidden word</span><br />        <span class="hljs-keyword">if</span> forbidden_word <span class="hljs-keyword">in</span> input:<br />            <span class="hljs-keyword">pass</span><br />        <span class="hljs-comment"># Otherwise publish the message</span><br />        <span class="hljs-keyword">else</span>:<br />            <span class="hljs-keyword">return</span> input<br /></code></pre>
<p>The Python SDK <a href="#context"><code>Context</code></a> object enables you to access key/value pairs provided to Pulsar Functions via the command line (as JSON). The following example passes a key/value pair.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br />  <span class="hljs-comment"># Other function configs \</span><br />  --user-config <span class="hljs-string">'{"word-of-the-day":"verdure"}'</span><br /></code></pre>
<p>To access that value in a Python function:</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">UserConfigFunction</span><span class="hljs-params">(Function)</span>:</span><br />    <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span><br />        logger = context.get_logger()<br />        wotd = context.get_user_config_value(<span class="hljs-string">'word-of-the-day'</span>)<br />        <span class="hljs-keyword">if</span> wotd <span class="hljs-keyword">is</span> <span class="hljs-literal">None</span>:<br />            logger.warn(<span class="hljs-string">'No word of the day provided'</span>)<br />        <span class="hljs-keyword">else</span>:<br />            logger.info(<span class="hljs-string">"The word of the day is {0}"</span>.format(wotd))<br /></code></pre>
</span></div></div></div></div>
<h3><a class="anchor" aria-hidden="true" id="logger"></a><a href="#logger" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Logger</h3>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-10547-tab-10548" class="nav-link active" data-group="group_10547" data-tab="tab-group-10547-content-10548">Java</div><div id="tab-group-10547-tab-10549" class="nav-link" data-group="group_10547" data-tab="tab-group-10547-content-10549">Python</div><div id="tab-group-10547-tab-10550" class="nav-link" data-group="group_10547" data-tab="tab-group-10547-content-10550">Go</div></div><div class="tab-content"><div id="tab-group-10547-content-10548" class="tab-pane active" data-group="group_10547" tabindex="-1"><div><span><p>Pulsar Functions that use the Java SDK have access to an <a href="https://www.slf4j.org/">SLF4j</a> <a href="https://www.slf4j.org/api/org/apache/log4j/Logger.html"><code>Logger</code></a> object that can be used to produce logs at the chosen log level. The following example logs either a <code>WARNING</code>- or <code>INFO</code>-level log based on whether the incoming string contains the word <code>danger</code>.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;<br /><span class="hljs-keyword">import</span> org.slf4j.Logger;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">LoggingFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{<br />    <span class="hljs-meta">@Override</span><br />    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">apply</span><span class="hljs-params">(String input, Context context)</span> </span>{<br />        Logger LOG = context.getLogger();<br />        String messageId = <span class="hljs-keyword">new</span> String(context.getMessageId());<br /><br />        <span class="hljs-keyword">if</span> (input.contains(<span class="hljs-string">"danger"</span>)) {<br />            LOG.warn(<span class="hljs-string">"A warning was received in message {}"</span>, messageId);<br />        } <span class="hljs-keyword">else</span> {<br />            LOG.info(<span class="hljs-string">"Message {} received\nContent: {}"</span>, messageId, input);<br />        }<br /><br />        <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;<br />    }<br />}<br /></code></pre>
<p>If you want your function to produce logs, you need to specify a log topic when creating or running the function. The following is an example.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br />  --jar my-functions.jar \<br />  --classname my.package.LoggingFunction \<br />  --<span class="hljs-built_in">log</span>-topic persistent://public/default/logging-function-logs \<br />  <span class="hljs-comment"># Other function configs</span><br /></code></pre>
<p>All logs produced by <code>LoggingFunction</code> above can be accessed via the <code>persistent://public/default/logging-function-logs</code> topic.</p>
</span></div></div><div id="tab-group-10547-content-10549" class="tab-pane" data-group="group_10547" tabindex="-1"><div><span><p>Pulsar Functions that use the Python SDK have access to a logging object that can be used to produce logs at the chosen log level. The following example function that logs either a <code>WARNING</code>- or <code>INFO</code>-level log based on whether the incoming string contains the word <code>danger</code>.</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">LoggingFunction</span><span class="hljs-params">(Function)</span>:</span><br />    <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span><br />        logger = context.get_logger()<br />        msg_id = context.get_message_id()<br />        <span class="hljs-keyword">if</span> <span class="hljs-string">'danger'</span> <span class="hljs-keyword">in</span> input:<br />            logger.warn(<span class="hljs-string">"A warning was received in message {0}"</span>.format(context.get_message_id()))<br />        <span class="hljs-keyword">else</span>:<br />            logger.info(<span class="hljs-string">"Message {0} received\nContent: {1}"</span>.format(msg_id, input))<br /></code></pre>
<p>If you want your function to produce logs on a Pulsar topic, you need to specify a <strong>log topic</strong> when creating or running the function. The following is an example.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">functions</span> create \<br />  --py logging_function.py \<br />  --classname logging_function.LoggingFunction \<br />  --<span class="hljs-built_in">log</span>-topic logging-function-logs \<br />  <span class="hljs-comment"># Other function configs</span><br /></code></pre>
<p>All logs produced by <code>LoggingFunction</code> above can be accessed via the <code>logging-function-logs</code> topic.</p>
</span></div></div><div id="tab-group-10547-content-10550" class="tab-pane" data-group="group_10547" tabindex="-1"><div><span><p>The following Go Function example shows different log levels based on the function input.</p>
<pre><code class="hljs">import (<br />    <span class="hljs-string">"context"</span><br /><br />    <span class="hljs-string">"github.com/apache/pulsar/pulsar-function-go/log"</span><br />    <span class="hljs-string">"github.com/apache/pulsar/pulsar-function-go/pf"</span><br />)<br /><br />func logger<span class="hljs-constructor">Func(<span class="hljs-params">ctx</span> <span class="hljs-params">context</span>.Context, <span class="hljs-params">input</span> []<span class="hljs-params">byte</span>)</span> {<br />    <span class="hljs-keyword">if</span> len(input) &lt;= <span class="hljs-number">100</span> {<br />        log.<span class="hljs-constructor">Infof(<span class="hljs-string">"This input has a length of: %d"</span>, <span class="hljs-params">len</span>(<span class="hljs-params">input</span>)</span>)<br />    } <span class="hljs-keyword">else</span> {<br />        log.<span class="hljs-constructor">Warnf(<span class="hljs-string">"This input is getting too long! It has {%d} characters"</span>, <span class="hljs-params">len</span>(<span class="hljs-params">input</span>)</span>)<br />    }<br />}<br /><br />func main<span class="hljs-literal">()</span> {<br />    pf.<span class="hljs-constructor">Start(<span class="hljs-params">loggerFunc</span>)</span><br />}<br /></code></pre>
<p>When you use <code>logTopic</code> related functionalities in Go Function, import <code>github.com/apache/pulsar/pulsar-function-go/log</code>, and you do not have to use the <code>getLogger()</code> context object.</p>
</span></div></div></div></div>
<h2><a class="anchor" aria-hidden="true" id="metrics"></a><a href="#metrics" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Metrics</h2>
<p>Pulsar Functions can publish arbitrary metrics to the metrics interface which can be queried.</p>
<blockquote>
<p>If a Pulsar Function uses the language-native interface for Java or Python, that function is not able to publish metrics and stats to Pulsar.</p>
</blockquote>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-10551-tab-10552" class="nav-link active" data-group="group_10551" data-tab="tab-group-10551-content-10552">Java</div><div id="tab-group-10551-tab-10553" class="nav-link" data-group="group_10551" data-tab="tab-group-10551-content-10553">Python</div></div><div class="tab-content"><div id="tab-group-10551-content-10552" class="tab-pane active" data-group="group_10551" tabindex="-1"><div><span><p>You can record metrics using the <a href="#context"><code>Context</code></a> object on a per-key basis. For example, you can set a metric for the <code>process-count</code> key and a different metric for the <code>elevens-count</code> key every time the function processes a message.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Context;<br /><span class="hljs-keyword">import</span> org.apache.pulsar.functions.api.Function;<br /><br /><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MetricRecorderFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">Integer</span>, <span class="hljs-title">Void</span>&gt; </span>{<br />    <span class="hljs-meta">@Override</span><br />    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">apply</span><span class="hljs-params">(Integer input, Context context)</span> </span>{<br />        <span class="hljs-comment">// Records the metric 1 every time a message arrives</span><br />        context.recordMetric(<span class="hljs-string">"hit-count"</span>, <span class="hljs-number">1</span>);<br /><br />        <span class="hljs-comment">// Records the metric only if the arriving number equals 11</span><br />        <span class="hljs-keyword">if</span> (input == <span class="hljs-number">11</span>) {<br />            context.recordMetric(<span class="hljs-string">"elevens-count"</span>, <span class="hljs-number">1</span>);<br />        }<br /><br />        <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;<br />    }<br />}<br /></code></pre>
<blockquote>
<p>For instructions on reading and using metrics, see the <a href="/docs/en/2.4.1/deploy-monitoring">Monitoring</a> guide.</p>
</blockquote>
</span></div></div><div id="tab-group-10551-content-10553" class="tab-pane" data-group="group_10551" tabindex="-1"><div><span><p>You can record metrics using the <a href="#context"><code>Context</code></a> object on a per-key basis. For example, you can set a metric for the <code>process-count</code> key and a different metric for the <code>elevens-count</code> key every time the function processes a message. The following is an example.</p>
<pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MetricRecorderFunction</span><span class="hljs-params">(Function)</span>:</span><br />    <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, input, context)</span>:</span><br />        context.record_metric(<span class="hljs-string">'hit-count'</span>, <span class="hljs-number">1</span>)<br /><br />        <span class="hljs-keyword">if</span> input == <span class="hljs-number">11</span>:<br />            context.record_metric(<span class="hljs-string">'elevens-count'</span>, <span class="hljs-number">1</span>)<br /></code></pre>
</span></div></div></div></div>
<h3><a class="anchor" aria-hidden="true" id="access-metrics"></a><a href="#access-metrics" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Access metrics</h3>
<p>To access metrics created by Pulsar Functions, refer to <a href="/docs/en/2.4.1/deploy-monitoring">Monitoring</a> in Pulsar.</p>
<h2><a class="anchor" aria-hidden="true" id="state-storage"></a><a href="#state-storage" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>State storage</h2>
<p>Pulsar Functions use <a href="https://bookkeeper.apache.org">Apache BookKeeper</a> as a state storage interface. Pulsar installation, including the local standalone installation, includes deployment of BookKeeper bookies.</p>
<p>Since Pulsar 2.1.0 release, Pulsar integrates with Apache BookKeeper <a href="https://docs.google.com/document/d/155xAwWv5IdOitHh1NVMEwCMGgB28M3FyMiQSxEpjE-Y/edit#heading=h.56rbh52koe3f">table service</a> to store the <code>State</code> for functions. For example, a <code>WordCount</code> function can store its <code>counters</code> state into BookKeeper table service via Pulsar Functions State API.</p>
<p>States are key-value pairs, where the key is a string and the value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual Pulsar Function, and shared between instances of that function.</p>
<p>You can access states within Pulsar Java Functions using the <code>putState</code>, <code>putStateAsync</code>, <code>getState</code>, <code>getStateAsync</code>, <code>incrCounter</code>, <code>incrCounterAsync</code>,  <code>getCounter</code>, <code>getCounterAsync</code> and <code>deleteState</code> calls on the context object. You can access states within Pulsar Python Functions using the <code>putState</code>, <code>getState</code>, <code>incrCounter</code>, <code>getCounter</code> and <code>deleteState</code> calls on the context object. You can also manage states using the <a href="#query-state">querystate</a> and <a href="#putstate">putstate</a> options to <code>pulsar-admin functions</code>.</p>
<h3><a class="anchor" aria-hidden="true" id="api"></a><a href="#api" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>API</h3>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-10554-tab-10555" class="nav-link active" data-group="group_10554" data-tab="tab-group-10554-content-10555">Java</div><div id="tab-group-10554-tab-10556" class="nav-link" data-group="group_10554" data-tab="tab-group-10554-content-10556">Python</div></div><div class="tab-content"><div id="tab-group-10554-content-10555" class="tab-pane active" data-group="group_10554" tabindex="-1"><div><span><p>Currently Pulsar Functions expose the following APIs for mutating and accessing State. These APIs are available in the <a href="/docs/en/2.4.1/functions-develop#context">Context</a> object when you are using Java SDK functions.</p>
<h4><a class="anchor" aria-hidden="true" id="incrcounter"></a><a href="#incrcounter" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>incrCounter</h4>
<pre><code class="hljs css language-java">    <span class="hljs-comment">/**<br />     * Increment the builtin distributed counter referred by key<br />     * <span class="hljs-doctag">@param</span> key The name of the key<br />     * <span class="hljs-doctag">@param</span> amount The amount to be incremented<br />     */</span><br />    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">incrCounter</span><span class="hljs-params">(String key, <span class="hljs-keyword">long</span> amount)</span></span>;<br /></code></pre>
<p>The application can use <code>incrCounter</code> to change the counter of a given <code>key</code> by the given <code>amount</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="incrcounterasync"></a><a href="#incrcounterasync" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>incrCounterAsync</h4>
<pre><code class="hljs css language-java">     <span class="hljs-comment">/**<br />     * Increment the builtin distributed counter referred by key<br />     * but dont wait for the completion of the increment operation<br />     *<br />     * <span class="hljs-doctag">@param</span> key The name of the key<br />     * <span class="hljs-doctag">@param</span> amount The amount to be incremented<br />     */</span><br />    <span class="hljs-function">CompletableFuture&lt;Void&gt; <span class="hljs-title">incrCounterAsync</span><span class="hljs-params">(String key, <span class="hljs-keyword">long</span> amount)</span></span>;<br /></code></pre>
<p>The application can use <code>incrCounterAsync</code> to asynchronously change the counter of a given <code>key</code> by the given <code>amount</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="getcounter"></a><a href="#getcounter" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>getCounter</h4>
<pre><code class="hljs css language-java">    <span class="hljs-comment">/**<br />     * Retrieve the counter value for the key.<br />     *<br />     * <span class="hljs-doctag">@param</span> key name of the key<br />     * <span class="hljs-doctag">@return</span> the amount of the counter value for this key<br />     */</span><br />    <span class="hljs-function"><span class="hljs-keyword">long</span> <span class="hljs-title">getCounter</span><span class="hljs-params">(String key)</span></span>;<br /></code></pre>
<p>The pplication can use <code>getCounter</code> to retrieve the counter of a given <code>key</code> mutated by <code>incrCounter</code>.</p>
<p>Except the <code>counter</code> API, Pulsar also exposes a general key/value API for functions to store
general key/value state.</p>
<h4><a class="anchor" aria-hidden="true" id="getcounterasync"></a><a href="#getcounterasync" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>getCounterAsync</h4>
<pre><code class="hljs css language-java">     <span class="hljs-comment">/**<br />     * Retrieve the counter value for the key, but don't wait<br />     * for the operation to be completed<br />     *<br />     * <span class="hljs-doctag">@param</span> key name of the key<br />     * <span class="hljs-doctag">@return</span> the amount of the counter value for this key<br />     */</span><br />    <span class="hljs-function">CompletableFuture&lt;Long&gt; <span class="hljs-title">getCounterAsync</span><span class="hljs-params">(String key)</span></span>;<br /></code></pre>
<p>The application can use <code>getCounterAsync</code> to asynchronously retrieve the counter of a given <code>key</code> mutated by <code>incrCounterAsync</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="putstate"></a><a href="#putstate" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>putState</h4>
<pre><code class="hljs css language-java">    <span class="hljs-comment">/**<br />     * Update the state value for the key.<br />     *<br />     * <span class="hljs-doctag">@param</span> key name of the key<br />     * <span class="hljs-doctag">@param</span> value state value of the key<br />     */</span><br />    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">putState</span><span class="hljs-params">(String key, ByteBuffer value)</span></span>;<br /></code></pre>
<h4><a class="anchor" aria-hidden="true" id="putstateasync"></a><a href="#putstateasync" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>putStateAsync</h4>
<pre><code class="hljs css language-java">    <span class="hljs-comment">/**<br />     * Update the state value for the key, but don't wait for the operation to be completed<br />     *<br />     * <span class="hljs-doctag">@param</span> key name of the key<br />     * <span class="hljs-doctag">@param</span> value state value of the key<br />     */</span><br />    <span class="hljs-function">CompletableFuture&lt;Void&gt; <span class="hljs-title">putStateAsync</span><span class="hljs-params">(String key, ByteBuffer value)</span></span>;<br /></code></pre>
<p>The application can use <code>putStateAsync</code> to asynchronously update the state of a given <code>key</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="getstate"></a><a href="#getstate" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>getState</h4>
<pre><code class="hljs css language-java">    <span class="hljs-comment">/**<br />     * Retrieve the state value for the key.<br />     *<br />     * <span class="hljs-doctag">@param</span> key name of the key<br />     * <span class="hljs-doctag">@return</span> the state value for the key.<br />     */</span><br />    <span class="hljs-function">ByteBuffer <span class="hljs-title">getState</span><span class="hljs-params">(String key)</span></span>;<br /></code></pre>
<h4><a class="anchor" aria-hidden="true" id="getstateasync"></a><a href="#getstateasync" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>getStateAsync</h4>
<pre><code class="hljs css language-java">    <span class="hljs-comment">/**<br />     * Retrieve the state value for the key, but don't wait for the operation to be completed<br />     *<br />     * <span class="hljs-doctag">@param</span> key name of the key<br />     * <span class="hljs-doctag">@return</span> the state value for the key.<br />     */</span><br />    <span class="hljs-function">CompletableFuture&lt;ByteBuffer&gt; <span class="hljs-title">getStateAsync</span><span class="hljs-params">(String key)</span></span>;<br /></code></pre>
<p>The application can use <code>getStateAsync</code> to asynchronously retrieve the state of a given <code>key</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="deletestate"></a><a href="#deletestate" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>deleteState</h4>
<pre><code class="hljs css language-java">    <span class="hljs-comment">/**<br />     * Delete the state value for the key.<br />     *<br />     * <span class="hljs-doctag">@param</span> key   name of the key<br />     */</span><br /></code></pre>
<p>Counters and binary values share the same keyspace, so this deletes either type.</p>
</span></div></div><div id="tab-group-10554-content-10556" class="tab-pane" data-group="group_10554" tabindex="-1"><div><span><p>Currently Pulsar Functions expose the following APIs for mutating and accessing State. These APIs are available in the <a href="#context">Context</a> object when you are using Python SDK functions.</p>
<h4><a class="anchor" aria-hidden="true" id="incr_counter"></a><a href="#incr_counter" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>incr_counter</h4>
<pre><code class="hljs css language-python">  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">incr_counter</span><span class="hljs-params">(self, key, amount)</span>:</span><br />    <span class="hljs-string">"""incr the counter of a given key in the managed state"""</span><br /></code></pre>
<p>Application can use <code>incr_counter</code> to change the counter of a given <code>key</code> by the given <code>amount</code>.
If the <code>key</code> does not exist, a new key is created.</p>
<h4><a class="anchor" aria-hidden="true" id="get_counter"></a><a href="#get_counter" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>get_counter</h4>
<pre><code class="hljs css language-python">  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_counter</span><span class="hljs-params">(self, key)</span>:</span><br />    <span class="hljs-string">"""get the counter of a given key in the managed state"""</span><br /></code></pre>
<p>Application can use <code>get_counter</code> to retrieve the counter of a given <code>key</code> mutated by <code>incrCounter</code>.</p>
<p>Except the <code>counter</code> API, Pulsar also exposes a general key/value API for functions to store
general key/value state.</p>
<h4><a class="anchor" aria-hidden="true" id="put_state"></a><a href="#put_state" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>put_state</h4>
<pre><code class="hljs css language-python">  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">put_state</span><span class="hljs-params">(self, key, value)</span>:</span><br />    <span class="hljs-string">"""update the value of a given key in the managed state"""</span><br /></code></pre>
<p>The key is a string, and the value is arbitrary binary data.</p>
<h4><a class="anchor" aria-hidden="true" id="get_state"></a><a href="#get_state" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>get_state</h4>
<pre><code class="hljs css language-python">  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">get_state</span><span class="hljs-params">(self, key)</span>:</span><br />    <span class="hljs-string">"""get the value of a given key in the managed state"""</span><br /></code></pre>
<h4><a class="anchor" aria-hidden="true" id="del_counter"></a><a href="#del_counter" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>del_counter</h4>
<pre><code class="hljs css language-python">  <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">del_counter</span><span class="hljs-params">(self, key)</span>:</span><br />    <span class="hljs-string">"""delete the counter of a given key in the managed state"""</span><br /></code></pre>
<p>Counters and binary values share the same keyspace, so this deletes either type.</p>
</span></div></div></div></div>
<h3><a class="anchor" aria-hidden="true" id="query-state"></a><a href="#query-state" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Query State</h3>
<p>A Pulsar Function can use the <a href="#api">State API</a> for storing state into Pulsar's state storage
and retrieving state back from Pulsar's state storage. Additionally Pulsar also provides
CLI commands for querying its state.</p>
<pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> bin/pulsar-admin <span class="hljs-built_in">functions</span> querystate \</span>
    --tenant &lt;tenant&gt; \
    --namespace &lt;namespace&gt; \
    --name &lt;function-name&gt; \
    --state-storage-url &lt;bookkeeper-service-url&gt; \
    --key &lt;state-key&gt; \
    [---watch]
</code></pre>
<p>If <code>--watch</code> is specified, the CLI will watch the value of the provided <code>state-key</code>.</p>
<h3><a class="anchor" aria-hidden="true" id="example-1"></a><a href="#example-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example</h3>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-10557-tab-10558" class="nav-link active" data-group="group_10557" data-tab="tab-group-10557-content-10558">Java</div><div id="tab-group-10557-tab-10559" class="nav-link" data-group="group_10557" data-tab="tab-group-10557-content-10559">Python</div></div><div class="tab-content"><div id="tab-group-10557-content-10558" class="tab-pane active" data-group="group_10557" tabindex="-1"><div><span><p><a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java"><code>WordCountFunction</code></a>
 is a very good example
demonstrating on how Application can easily store <code>state</code> in Pulsar Functions.</p>
<pre><code class="hljs css language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">WordCountFunction</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Function</span>&lt;<span class="hljs-title">String</span>, <span class="hljs-title">Void</span>&gt; </span>{<br />    <span class="hljs-meta">@Override</span><br />    <span class="hljs-function"><span class="hljs-keyword">public</span> Void <span class="hljs-title">process</span><span class="hljs-params">(String input, Context context)</span> <span class="hljs-keyword">throws</span> Exception </span>{<br />        Arrays.asList(input.split(<span class="hljs-string">"\\."</span>)).forEach(word -&gt; context.incrCounter(word, <span class="hljs-number">1</span>));<br />        <span class="hljs-keyword">return</span> <span class="hljs-keyword">null</span>;<br />    }<br />}<br /></code></pre>
<p>The logic of this <code>WordCount</code> function is pretty simple and straightforward:</p>
<ol>
<li>The function first splits the received <code>String</code> into multiple words using regex <code>\\.</code>.</li>
<li>For each <code>word</code>, the function increments the corresponding <code>counter</code> by 1 (via <code>incrCounter(key, amount)</code>).</li>
</ol>
</span></div></div><div id="tab-group-10557-content-10559" class="tab-pane" data-group="group_10557" tabindex="-1"><div><span><pre><code class="hljs css language-python"><span class="hljs-keyword">from</span> pulsar <span class="hljs-keyword">import</span> Function<br /><br /><span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">WordCount</span><span class="hljs-params">(Function)</span>:</span><br />    <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">process</span><span class="hljs-params">(self, item, context)</span>:</span><br />        <span class="hljs-keyword">for</span> word <span class="hljs-keyword">in</span> item.split():<br />            context.incr_counter(word, <span class="hljs-number">1</span>)<br /></code></pre>
<p>The logic of this <code>WordCount</code> function is pretty simple and straightforward:</p>
<ol>
<li>The function first splits the received string into multiple words on space.</li>
<li>For each <code>word</code>, the function increments the corresponding <code>counter</code> by 1 (via <code>incr_counter(key, amount)</code>).</li>
</ol>
</span></div></div></div></div>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.4.1/functions-runtime"><span class="arrow-prev">← </span><span>Setup: Configure Functions runtime</span></a><a class="docs-next button" href="/docs/en/2.4.1/functions-debug"><span>How-to: Debug</span><span class="arrow-next"> →</span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#available-apis">Available APIs</a></li><li><a href="#schema-registry">Schema registry</a></li><li><a href="#serde">SerDe</a><ul class="toc-headings"><li><a href="#example">Example</a></li></ul></li><li><a href="#context">Context</a><ul class="toc-headings"><li><a href="#user-config">User config</a></li><li><a href="#logger">Logger</a></li></ul></li><li><a href="#metrics">Metrics</a><ul class="toc-headings"><li><a href="#access-metrics">Access metrics</a></li></ul></li><li><a href="#state-storage">State storage</a><ul class="toc-headings"><li><a href="#api">API</a></li><li><a href="#query-state">Query State</a></li><li><a href="#example-1">Example</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
      const community = document.querySelector("a[href='#community']").parentNode;
      const communityMenu =
        '<li>' +
        '<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
        '<div id="community-dropdown" class="hide">' +
          '<ul id="community-dropdown-items">' +
            '<li><a href="/en/contact">Contact</a></li>' +
            '<li><a href="/en/contributing">Contributing</a></li>' +
            '<li><a href="/en/coding-guide">Coding guide</a></li>' +
            '<li><a href="/en/events">Events</a></li>' +
            '<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
            '<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
            '<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
            '<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
            '<li>&nbsp;</li>' +
            '<li><a href="/en/resources">Resources</a></li>' +
            '<li><a href="/en/team">Team</a></li>' +
            '<li><a href="/en/powered-by">Powered By</a></li>' +
          '</ul>' +
        '</div>' +
        '</li>';

      community.innerHTML = communityMenu;

      const communityMenuItem = document.getElementById("community-menu");
      const communityDropDown = document.getElementById("community-dropdown");
      communityMenuItem.addEventListener("click", function(event) {
        event.preventDefault();

        if (communityDropDown.className == 'hide') {
          communityDropDown.className = 'visible';
        } else {
          communityDropDown.className = 'hide';
        }
      });
    </script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>