<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>How to develop Pulsar connectors · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="This guide describes how to develop Pulsar connectors to move data"/><meta name="docsearch:version" content="2.8.2"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="How to develop Pulsar connectors · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="This guide describes how to develop Pulsar connectors to move data"/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.8.2</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.8.2/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.8.2/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.8.2/io-develop">日本語</a></li><li><a href="/docs/fr/2.8.2/io-develop">Français</a></li><li><a href="/docs/ko/2.8.2/io-develop">한국어</a></li><li><a href="/docs/zh-CN/2.8.2/io-develop">中文</a></li><li><a href="/docs/zh-TW/2.8.2/io-develop">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
        const languagesMenuItem = document.getElementById("languages-menu");
        const languagesDropDown = document.getElementById("languages-dropdown");
        languagesMenuItem.addEventListener("click", function(event) {
          event.preventDefault();

          if (languagesDropDown.className == "hide") {
            languagesDropDown.className = "visible";
          } else {
            languagesDropDown.className = "hide";
          }
        });
      </script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i>›</i><span>Pulsar IO</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/getting-started-helm">Run Pulsar in Kubernetes</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/concepts-proxy-sni-routing">Proxy support with SNI routing</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/concepts-multiple-advertised-listeners">Multiple advertised listeners</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/functions-package">How-to: Package</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/io-connectors">Built-in connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/io-cdc">CDC connector</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.8.2/io-develop">Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/io-cli">CLI</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/sql-getting-started">Query data</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/sql-deployment-configurations">Configuration and deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/sql-rest-api">REST APIs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Tiered Storage</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/tiered-storage-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/tiered-storage-aws">AWS S3 offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/tiered-storage-gcs">GCS offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/tiered-storage-filesystem">Filesystem offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/tiered-storage-azure">Azure BlobStore offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/tiered-storage-aliyun">Aliyun OSS offloader</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Transactions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/txn-why">Why transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/txn-what">What are transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/txn-how">How transactions work?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/txn-use">How to use transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/txn-monitor">How to monitor transactions?</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Kubernetes (Helm)</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/helm-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/helm-prepare">Prepare</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/helm-install">Install</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/helm-deploy">Deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/helm-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/helm-tools">Required Tools</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/deploy-docker">Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/deploy-monitoring">Monitor</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/administration-pulsar-manager">Pulsar Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/administration-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/administration-isolation">Pulsar isolation</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/security-tls-keystore">Using TLS with KeyStore configure</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/security-oauth2">Authentication using OAuth 2.0 access tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/security-extending">Extending</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/security-bouncy-castle">Bouncy Castle Providers</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Performance</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/performance-pulsar-perf">Pulsar Perf</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/client-libraries">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/client-libraries-node">Node.js</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/client-libraries-websocket">WebSocket</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/client-libraries-dotnet">C#</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/admin-api-topics">Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/admin-api-functions">Functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/admin-api-packages">Packages</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/develop-load-manager">Modular load manager</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.8.2/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
            var coll = document.getElementsByClassName('collapsible');
            var checkActiveCategory = true;
            for (var i = 0; i < coll.length; i++) {
              var links = coll[i].nextElementSibling.getElementsByTagName('*');
              if (checkActiveCategory){
                for (var j = 0; j < links.length; j++) {
                  if (links[j].classList.contains('navListItemActive')){
                    coll[i].nextElementSibling.classList.toggle('hide');
                    coll[i].childNodes[1].classList.toggle('rotate');
                    checkActiveCategory = false;
                    break;
                  }
                }
              }

              coll[i].addEventListener('click', function() {
                var arrow = this.childNodes[1];
                arrow.classList.toggle('rotate');
                var content = this.nextElementSibling;
                content.classList.toggle('hide');
              });
            }

            document.addEventListener('DOMContentLoaded', function() {
              createToggler('#navToggler', '#docsNav', 'docsSliderActive');
              createToggler('#tocToggler', 'body', 'tocActive');

              var headings = document.querySelector('.toc-headings');
              headings && headings.addEventListener('click', function(event) {
                var el = event.target;
                while(el !== headings){
                  if (el.tagName === 'A') {
                    document.body.classList.remove('tocActive');
                    break;
                  } else{
                    el = el.parentNode;
                  }
                }
              }, false);

              function createToggler(togglerSelector, targetSelector, className) {
                var toggler = document.querySelector(togglerSelector);
                var target = document.querySelector(targetSelector);

                if (!toggler) {
                  return;
                }

                toggler.onclick = function(event) {
                  event.preventDefault();

                  target.classList.toggle(className);
                };
              }
            });
        </script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-develop.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">How to develop Pulsar connectors</h1></header><article><div><span><p>This guide describes how to develop Pulsar connectors to move data
between Pulsar and other systems.</p>
<p>Pulsar connectors are special <a href="/docs/en/2.8.2/functions-overview">Pulsar Functions</a>, so creating
a Pulsar connector is similar to creating a Pulsar function.</p>
<p>Pulsar connectors come in two types:</p>
<table>
<thead>
<tr><th>Type</th><th>Description</th><th>Example</th></tr>
</thead>
<tbody>
<tr><td><a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java"><code>Source</code></a>
</td><td>Import data from another system to Pulsar.</td><td><a href="/docs/en/2.8.2/io-rabbitmq">RabbitMQ source connector</a> imports the messages of a RabbitMQ queue to a Pulsar topic.</td></tr>
<tr><td><a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java"><code>Sink</code></a>
</td><td>Export data from Pulsar to another system.</td><td><a href="/docs/en/2.8.2/io-kinesis">Kinesis sink connector</a> exports the messages of a Pulsar topic to a Kinesis stream.</td></tr>
</tbody>
</table>
<h2><a class="anchor" aria-hidden="true" id="develop"></a><a href="#develop" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Develop</h2>
<p>You can develop Pulsar source connectors and sink connectors.</p>
<h3><a class="anchor" aria-hidden="true" id="source"></a><a href="#source" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Source</h3>
<p>Developing a source connector is to implement the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java"><code>Source</code></a>

interface, which means you need to implement the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java"><code>open</code></a>
 method and the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java"><code>read</code></a>
 method.</p>
<ol>
<li><p>Implement the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java"><code>open</code></a>
 method.</p>
<pre><code class="hljs css language-java"><span class="hljs-comment">/**
* Open connector with configuration
*
* <span class="hljs-doctag">@param</span> config initialization config
* <span class="hljs-doctag">@param</span> sourceContext
* <span class="hljs-doctag">@throws</span> Exception IO type exceptions when opening a connector
*/</span>
<span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">open</span><span class="hljs-params">(<span class="hljs-keyword">final</span> Map&lt;String, Object&gt; config, SourceContext sourceContext)</span> <span class="hljs-keyword">throws</span> Exception</span>;
</code></pre>
<p>This method is called when the source connector is initialized.</p>
<p>In this method, you can retrieve all connector specific settings through the passed-in <code>config</code> parameter and initialize all necessary resources.</p>
<p>For example, a Kafka connector can create a Kafka client in this <code>open</code> method.</p>
<p>Besides, Pulsar runtime also provides a <code>SourceContext</code> for the
connector to access runtime resources for tasks like collecting metrics. The implementation can save the <code>SourceContext</code> for future use.</p></li>
<li><p>Implement the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java"><code>read</code></a>
 method.</p>
<pre><code class="hljs css language-java">    <span class="hljs-comment">/**
    * Reads the next message from source.
    * If source does not have any new messages, this call should block.
    * <span class="hljs-doctag">@return</span> next message from source.  The return result should never be null
    * <span class="hljs-doctag">@throws</span> Exception
    */</span>
    <span class="hljs-function">Record&lt;T&gt; <span class="hljs-title">read</span><span class="hljs-params">()</span> <span class="hljs-keyword">throws</span> Exception</span>;
</code></pre>
<p>If nothing to return, the implementation should be blocking rather than returning <code>null</code>.</p>
<p>The returned <a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java"><code>Record</code></a>
 should encapsulate the following information, which is needed by Pulsar IO runtime.</p>
<ul>
<li><p><a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java"><code>Record</code></a>
 should provide the following variables:</p>
<table>
<thead>
<tr><th>Variable</th><th>Required</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>TopicName</code></td><td>No</td><td>Pulsar topic name from which the record is originated from.</td></tr>
<tr><td><code>Key</code></td><td>No</td><td>Messages can optionally be tagged with keys.<br/><br/>For more information, see <a href="/docs/en/2.8.2/concepts-messaging#routing-modes">Routing modes</a>.</td></tr>
<tr><td><code>Value</code></td><td>Yes</td><td>Actual data of the record.</td></tr>
<tr><td><code>EventTime</code></td><td>No</td><td>Event time of the record from the source.</td></tr>
<tr><td><code>PartitionId</code></td><td>No</td><td>If the record is originated from a partitioned source, it returns its <code>PartitionId</code>. <br/><br/><code>PartitionId</code> is used as a part of the unique identifier by Pulsar IO runtime to deduplicate messages and achieve exactly-once processing guarantee.</td></tr>
<tr><td><code>RecordSequence</code></td><td>No</td><td>If the record is originated from a sequential source, it returns its <code>RecordSequence</code>.<br/><br/><code>RecordSequence</code> is used as a part of the unique identifier by Pulsar IO runtime to deduplicate messages and achieve exactly-once processing guarantee.</td></tr>
<tr><td><code>Properties</code></td><td>No</td><td>If the record carries user-defined properties, it returns those properties.</td></tr>
<tr><td><code>DestinationTopic</code></td><td>No</td><td>Topic to which message should be written.</td></tr>
<tr><td><code>Message</code></td><td>No</td><td>A class which carries data sent by users.<br/><br/>For more information, see <a href="https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java">Message.java</a>.</td></tr>
</tbody>
</table>
</li>
<li><p><a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java"><code>Record</code></a>
 should provide the following methods:</p>
<table>
<thead>
<tr><th>Method</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>ack</code></td><td>Acknowledge that the record is fully processed.</td></tr>
<tr><td><code>fail</code></td><td>Indicate that the record fails to be processed.</td></tr>
</tbody>
</table>
</li>
</ul></li>
</ol>
<h2><a class="anchor" aria-hidden="true" id="handle-schema-information"></a><a href="#handle-schema-information" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Handle schema information</h2>
<p>Pulsar IO automatically handles the schema and provides a strongly typed API based on Java generics.
If you know the schema type that you are producing, you can declare the Java class relative to that type in your sink declaration.</p>
<pre><code class="hljs"><span class="hljs-keyword">public</span> <span class="hljs-keyword">class</span> <span class="hljs-symbol">MySource</span> <span class="hljs-symbol">implements</span> <span class="hljs-symbol">Source</span>&lt;<span class="hljs-symbol">String</span>&gt; {
    <span class="hljs-keyword">public</span> Record&lt;String&gt; read() {}
}
</code></pre>
<p>If you want to implement a source that works with any schema, you can go with <code>byte[]</code> (of <code>ByteBuffer</code>) and use Schema.AUTO_PRODUCE_BYTES().</p>
<pre><code class="hljs"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MySource</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Source</span>&lt;<span class="hljs-title">byte</span>[]&gt; </span>{
    <span class="hljs-keyword">public</span> Record&lt;<span class="hljs-keyword">byte</span>[]&gt; read() {
        
        Schema wantedSchema = ....
        Record&lt;<span class="hljs-keyword">byte</span>[]&gt; myRecord = <span class="hljs-keyword">new</span> MyRecordImplementation(); 
        ....
    }
    <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">MyRecordImplementation</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Record</span>&lt;<span class="hljs-title">byte</span>[]&gt; </span>{
         <span class="hljs-keyword">public</span> <span class="hljs-keyword">byte</span>[] getValue() {
            <span class="hljs-keyword">return</span> ....encoded <span class="hljs-keyword">byte</span>[]...that represents the value 
         }
         <span class="hljs-keyword">public</span> Schema&lt;<span class="hljs-keyword">byte</span>[]&gt; getSchema() {
             <span class="hljs-keyword">return</span> Schema.AUTO_PRODUCE_BYTES(wantedSchema);
         }
    }
}
</code></pre>
<p>To handle the <code>KeyValue</code> type properly, follow the guidelines for your record implementation:</p>
<ul>
<li>It must implement <a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java"><code>Record</code></a>
 interface and implement <code>getKeySchema</code>,<code>getValueSchema</code>, and <code>getKeyValueEncodingType</code></li>
<li>It must return a <code>KeyValue</code> object as <code>Record.getValue()</code></li>
<li>It may return null in <code>Record.getSchema()</code></li>
</ul>
<p>When Pulsar IO runtime encounters a <code>KVRecord</code>, it brings the following changes automatically:</p>
<ul>
<li>Set properly the <code>KeyValueSchema</code></li>
<li>Encode the Message Key and the Message Value according to the <code>KeyValueEncoding</code> (SEPARATED or INLINE)</li>
</ul>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip"></a><a href="#tip" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <strong>how to create a source connector</strong>, see <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java"><code>KafkaSource</code></a>
.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="sink"></a><a href="#sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Sink</h3>
<p>Developing a sink connector <strong>is similar to</strong> developing a source connector, that is, you need to implement the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java"><code>Sink</code></a>
 interface, which means implementing the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java"><code>open</code></a>
 method and the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java"><code>write</code></a>
 method.</p>
<ol>
<li><p>Implement the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java"><code>open</code></a>
 method.</p>
<pre><code class="hljs css language-java">    <span class="hljs-comment">/**
    * Open connector with configuration
    *
    * <span class="hljs-doctag">@param</span> config initialization config
    * <span class="hljs-doctag">@param</span> sinkContext
    * <span class="hljs-doctag">@throws</span> Exception IO type exceptions when opening a connector
    */</span>
    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">open</span><span class="hljs-params">(<span class="hljs-keyword">final</span> Map&lt;String, Object&gt; config, SinkContext sinkContext)</span> <span class="hljs-keyword">throws</span> Exception</span>;
</code></pre></li>
<li><p>Implement the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java"><code>write</code></a>
 method.</p>
<pre><code class="hljs css language-java">    <span class="hljs-comment">/**
    * Write a message to Sink
    * <span class="hljs-doctag">@param</span> record record to write to sink
    * <span class="hljs-doctag">@throws</span> Exception
    */</span>
    <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">write</span><span class="hljs-params">(Record&lt;T&gt; record)</span> <span class="hljs-keyword">throws</span> Exception</span>;
</code></pre>
<p>During the implementation, you can decide how to write the <code>Value</code> and
the <code>Key</code> to the actual source, and leverage all the provided information such as
<code>PartitionId</code> and <code>RecordSequence</code> to achieve different processing guarantees.</p>
<p>You also need to ack records (if messages are sent successfully) or fail records (if messages fail to send).</p></li>
</ol>
<h2><a class="anchor" aria-hidden="true" id="handling-schema-information"></a><a href="#handling-schema-information" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Handling Schema information</h2>
<p>Pulsar IO handles automatically the Schema and provides a strongly typed API based on Java generics.
If you know the Schema type that you are consuming from you can declare the Java class relative to that type in your Sink declaration.</p>
<pre><code class="hljs"><span class="hljs-keyword">public</span> <span class="hljs-keyword">class</span> <span class="hljs-symbol">MySink</span> <span class="hljs-symbol">implements</span> <span class="hljs-symbol">Sink</span>&lt;<span class="hljs-symbol">String</span>&gt; {
    <span class="hljs-keyword">public</span> <span class="hljs-built_in">void</span> write(Record&lt;String&gt; record) {}
}
</code></pre>
<p>If you want to implement a sink that works with any schema, you can you go with the special GenericObject interface.</p>
<pre><code class="hljs">public class MySink implements Sink&lt;GenericObject&gt; {
    public void write(Record&lt;GenericObject&gt; record) {
        Schema schema = record.getSchema();
        GenericObject genericObject = record.getValue();
        <span class="hljs-keyword">if</span> (genericObject != <span class="hljs-literal">null</span>) {
            SchemaType<span class="hljs-built_in"> type </span>= genericObject.getSchemaType();
            Object nativeObject = genericObject.getNativeObject();
            <span class="hljs-built_in">..</span>.
        }
        <span class="hljs-built_in">..</span><span class="hljs-built_in">..</span>
    }
}
</code></pre>
<p>In the case of AVRO, JSON, and Protobuf records (schemaType=AVRO,JSON,PROTOBUF_NATIVE), you can cast the
<code>genericObject</code> variable to <code>GenericRecord</code> and use <code>getFields()</code> and <code>getField()</code> API.
You are able to access the native AVRO record using  <code>genericObject.getNativeObject()</code>.</p>
<p>In the case of KeyValue type, you can access both the schema for the key and the schema for the value using this code.</p>
<pre><code class="hljs"><span class="hljs-title">public</span> <span class="hljs-keyword">class</span> <span class="hljs-type">MySink</span> implements <span class="hljs-type">Sink</span>&lt;<span class="hljs-type">GenericObject</span>&gt; {
    public void write(<span class="hljs-type">Record</span>&lt;<span class="hljs-type">GenericObject</span>&gt; record) {
        <span class="hljs-type">Schema</span> schema = record.getSchema();
        <span class="hljs-type">GenericObject</span> genericObject = record.getValue();
        <span class="hljs-type">SchemaType</span> <span class="hljs-class"><span class="hljs-keyword">type</span> = genericObject.getSchemaType();</span>
        <span class="hljs-type">Object</span> nativeObject = genericObject.getNativeObject();
        <span class="hljs-keyword">if</span> (<span class="hljs-class"><span class="hljs-keyword">type</span> == <span class="hljs-type">SchemaType</span>.<span class="hljs-type">KEY_VALUE</span>) {
            <span class="hljs-type">KeyValue</span> <span class="hljs-title">keyValue</span> = (<span class="hljs-type">KeyValue</span>) <span class="hljs-title">nativeObject</span>;
            <span class="hljs-type">Object</span> <span class="hljs-title">key</span> = <span class="hljs-title">keyValue</span>.<span class="hljs-title">getKey</span>();
            <span class="hljs-type">Object</span> <span class="hljs-title">value</span> = <span class="hljs-title">keyValue</span>.<span class="hljs-title">getValue</span>();
        
            <span class="hljs-type">KeyValueSchema</span> <span class="hljs-title">keyValueSchema</span> = (<span class="hljs-type">KeyValueSchema</span>) <span class="hljs-title">schema</span>;
            <span class="hljs-type">Schema</span> <span class="hljs-title">keySchema</span> = <span class="hljs-title">keyValueSchema</span>.<span class="hljs-title">getKeySchema</span>();
            <span class="hljs-type">Schema</span> <span class="hljs-title">valueSchema</span> = <span class="hljs-title">keyValueSchema</span>.<span class="hljs-title">getValueSchema</span>();
        }</span>
        ....
    }
}
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="test"></a><a href="#test" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Test</h2>
<p>Testing connectors can be challenging because Pulsar IO connectors interact with two systems
that may be difficult to mock—Pulsar and the system to which the connector is connecting.</p>
<p>It is
recommended writing special tests to test the connector functionalities as below
while mocking the external service.</p>
<h3><a class="anchor" aria-hidden="true" id="unit-test"></a><a href="#unit-test" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Unit test</h3>
<p>You can create unit tests for your connector.</p>
<h3><a class="anchor" aria-hidden="true" id="integration-test"></a><a href="#integration-test" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Integration test</h3>
<p>Once you have written sufficient unit tests, you can add
separate integration tests to verify end-to-end functionality.</p>
<p>Pulsar uses
<a href="https://www.testcontainers.org/">testcontainers</a> <strong>for all integration tests</strong>.</p>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-1"></a><a href="#tip-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <strong>how to create integration tests for Pulsar connectors</strong>, see <a href="https://github.com/apache/pulsar/tree/master//tests/integration/src/test/java/org/apache/pulsar/tests/integration/io"><code>IntegrationTests</code></a>
.</p>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="package"></a><a href="#package" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Package</h2>
<p>Once you've developed and tested your connector, you need to package it so that it can be submitted
to a <a href="/docs/en/2.8.2/functions-overview">Pulsar Functions</a> cluster.</p>
<p>There are two methods to
work with Pulsar Functions' runtime, that is, <a href="#nar">NAR</a> and <a href="#uber-jar">uber JAR</a>.</p>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="note"></a><a href="#note" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Note</h4>
<p>If you plan to package and distribute your connector for others to use, you are obligated to
license and copyright your own code properly. Remember to add the license and copyright to
all libraries your code uses and to your distribution.</p>
<p>If you use the <a href="#nar">NAR</a> method, the NAR plugin
automatically creates a <code>DEPENDENCIES</code> file in the generated NAR package, including the proper
licensing and copyrights of all libraries of your connector.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="nar"></a><a href="#nar" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>NAR</h3>
<p><strong>NAR</strong> stands for NiFi Archive, which is a custom packaging mechanism used by Apache NiFi, to provide
a bit of Java ClassLoader isolation.</p>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-2"></a><a href="#tip-2" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <strong>how NAR works</strong>, see
<a href="https://medium.com/hashmapinc/nifi-nar-files-explained-14113f7796fd">here</a>.</p>
</blockquote>
<p>Pulsar uses the same mechanism for packaging <strong>all</strong> <a href="io-connectors">built-in connectors</a>.</p>
<p>The easiest approach to package a Pulsar connector is to create a NAR package using
<a href="https://mvnrepository.com/artifact/org.apache.nifi/nifi-nar-maven-plugin">nifi-nar-maven-plugin</a>.</p>
<p>Include this <a href="https://mvnrepository.com/artifact/org.apache.nifi/nifi-nar-maven-plugin">nifi-nar-maven-plugin</a> in your maven project for your connector as below.</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">plugins</span>&gt;</span>
  <span class="hljs-tag">&lt;<span class="hljs-name">plugin</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.nifi<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>nifi-nar-maven-plugin<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>1.2.0<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
  <span class="hljs-tag">&lt;/<span class="hljs-name">plugin</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">plugins</span>&gt;</span>
</code></pre>
<p>You must also create a <code>resources/META-INF/services/pulsar-io.yaml</code> file with the following contents:</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">name:</span> <span class="hljs-string">connector</span> <span class="hljs-string">name</span>
<span class="hljs-attr">description:</span> <span class="hljs-string">connector</span> <span class="hljs-string">description</span>
<span class="hljs-attr">sourceClass:</span> <span class="hljs-string">fully</span> <span class="hljs-string">qualified</span> <span class="hljs-string">class</span> <span class="hljs-string">name</span> <span class="hljs-string">(only</span> <span class="hljs-string">if</span> <span class="hljs-string">source</span> <span class="hljs-string">connector)</span>
<span class="hljs-attr">sinkClass:</span> <span class="hljs-string">fully</span> <span class="hljs-string">qualified</span> <span class="hljs-string">class</span> <span class="hljs-string">name</span> <span class="hljs-string">(only</span> <span class="hljs-string">if</span> <span class="hljs-string">sink</span> <span class="hljs-string">connector)</span>
</code></pre>
<p>For Gradle users, there is a <a href="https://plugins.gradle.org/plugin/io.github.lhotari.gradle-nar-plugin">Gradle Nar plugin available on the Gradle Plugin Portal</a>.</p>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-3"></a><a href="#tip-3" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about an <strong>how to use NAR for Pulsar connectors</strong>, see <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/twitter/pom.xml"><code>TwitterFirehose</code></a>
.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="uber-jar"></a><a href="#uber-jar" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Uber JAR</h3>
<p>An alternative approach is to create an <strong>uber JAR</strong> that contains all of the connector's JAR files
and other resource files. No directory internal structure is necessary.</p>
<p>You can use <a href="https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html">maven-shade-plugin</a> to create a uber JAR as below:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">plugin</span>&gt;</span>
  <span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.maven.plugins<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
  <span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>maven-shade-plugin<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
  <span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>3.1.1<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
  <span class="hljs-tag">&lt;<span class="hljs-name">executions</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">execution</span>&gt;</span>
      <span class="hljs-tag">&lt;<span class="hljs-name">phase</span>&gt;</span>package<span class="hljs-tag">&lt;/<span class="hljs-name">phase</span>&gt;</span>
      <span class="hljs-tag">&lt;<span class="hljs-name">goals</span>&gt;</span>
        <span class="hljs-tag">&lt;<span class="hljs-name">goal</span>&gt;</span>shade<span class="hljs-tag">&lt;/<span class="hljs-name">goal</span>&gt;</span>
      <span class="hljs-tag">&lt;/<span class="hljs-name">goals</span>&gt;</span>
      <span class="hljs-tag">&lt;<span class="hljs-name">configuration</span>&gt;</span>
        <span class="hljs-tag">&lt;<span class="hljs-name">filters</span>&gt;</span>
          <span class="hljs-tag">&lt;<span class="hljs-name">filter</span>&gt;</span>
            <span class="hljs-tag">&lt;<span class="hljs-name">artifact</span>&gt;</span>*:*<span class="hljs-tag">&lt;/<span class="hljs-name">artifact</span>&gt;</span>
          <span class="hljs-tag">&lt;/<span class="hljs-name">filter</span>&gt;</span>
        <span class="hljs-tag">&lt;/<span class="hljs-name">filters</span>&gt;</span>
      <span class="hljs-tag">&lt;/<span class="hljs-name">configuration</span>&gt;</span>
    <span class="hljs-tag">&lt;/<span class="hljs-name">execution</span>&gt;</span>
  <span class="hljs-tag">&lt;/<span class="hljs-name">executions</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">plugin</span>&gt;</span>
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="monitor"></a><a href="#monitor" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Monitor</h2>
<p>Pulsar connectors enable you to move data in and out of Pulsar easily. It is important to ensure that the running connectors are healthy at any time. You can monitor Pulsar connectors that have been deployed with the following methods:</p>
<ul>
<li><p>Check the metrics provided by Pulsar.</p>
<p>Pulsar connectors expose the metrics that can be collected and used for monitoring the health of <strong>Java</strong> connectors. You can check the metrics by following the <a href="/docs/en/2.8.2/deploy-monitoring">monitoring</a> guide.</p></li>
<li><p>Set and check your customized metrics.</p>
<p>In addition to the metrics provided by Pulsar, Pulsar allows you to customize metrics for <strong>Java</strong> connectors. Function workers collect user-defined metrics to Prometheus automatically and you can check them in Grafana.</p></li>
</ul>
<p>Here is an example of how to customize metrics for a Java connector.</p>
<div class="tabs"><div class="nav-tabs"><div id="tab-group-4416-tab-4417" class="nav-link active" data-group="group_4416" data-tab="tab-group-4416-content-4417">Java</div></div><div class="tab-content"><div id="tab-group-4416-content-4417" class="tab-pane active" data-group="group_4416" tabindex="-1"><div><span><pre><code class="hljs"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">TestMetricSink</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Sink</span>&lt;<span class="hljs-title">String</span>&gt; </span>{<br /><br />        <span class="hljs-meta">@Override</span><br />        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">open</span><span class="hljs-params">(Map&lt;String, Object&gt; config, SinkContext sinkContext)</span> <span class="hljs-keyword">throws</span> Exception </span>{<br />            sinkContext.recordMetric(<span class="hljs-string">"foo"</span>, <span class="hljs-number">1</span>);<br />        }<br /><br />        <span class="hljs-meta">@Override</span><br />        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">write</span><span class="hljs-params">(Record&lt;String&gt; record)</span> <span class="hljs-keyword">throws</span> Exception </span>{<br /><br />        }<br /><br />        <span class="hljs-meta">@Override</span><br />        <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">close</span><span class="hljs-params">()</span> <span class="hljs-keyword">throws</span> Exception </span>{<br /><br />        }<br />    }<br /></code></pre>
</span></div></div></div></div>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.8.2/io-cdc"><span class="arrow-prev">← </span><span>CDC connector</span></a><a class="docs-next button" href="/docs/en/2.8.2/io-cli"><span>CLI</span><span class="arrow-next"> →</span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#develop">Develop</a><ul class="toc-headings"><li><a href="#source">Source</a></li></ul></li><li><a href="#handle-schema-information">Handle schema information</a><ul class="toc-headings"><li><a href="#sink">Sink</a></li></ul></li><li><a href="#handling-schema-information">Handling Schema information</a></li><li><a href="#test">Test</a><ul class="toc-headings"><li><a href="#unit-test">Unit test</a></li><li><a href="#integration-test">Integration test</a></li></ul></li><li><a href="#package">Package</a><ul class="toc-headings"><li><a href="#nar">NAR</a></li><li><a href="#uber-jar">Uber JAR</a></li></ul></li><li><a href="#monitor">Monitor</a></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
      const community = document.querySelector("a[href='#community']").parentNode;
      const communityMenu =
        '<li>' +
        '<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
        '<div id="community-dropdown" class="hide">' +
          '<ul id="community-dropdown-items">' +
            '<li><a href="/en/contact">Contact</a></li>' +
            '<li><a href="/en/contributing">Contributing</a></li>' +
            '<li><a href="/en/coding-guide">Coding guide</a></li>' +
            '<li><a href="/en/events">Events</a></li>' +
            '<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
            '<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
            '<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
            '<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
            '<li>&nbsp;</li>' +
            '<li><a href="/en/resources">Resources</a></li>' +
            '<li><a href="/en/team">Team</a></li>' +
            '<li><a href="/en/powered-by">Powered By</a></li>' +
          '</ul>' +
        '</div>' +
        '</li>';

      community.innerHTML = communityMenu;

      const communityMenuItem = document.getElementById("community-menu");
      const communityDropDown = document.getElementById("community-dropdown");
      communityMenuItem.addEventListener("click", function(event) {
        event.preventDefault();

        if (communityDropDown.className == 'hide') {
          communityDropDown.className = 'visible';
        } else {
          communityDropDown.className = 'hide';
        }
      });
    </script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>