blob: 026cfa978a6b9260e074db9d314da2c8cfce48e3 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Develop Connectors · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="This guide describes how developers can write new connectors for Pulsar IO to move data"/><meta name="docsearch:version" content="2.4.2"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Develop Connectors · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="This guide describes how developers can write new connectors for Pulsar IO to move data"/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.4.2</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.4.2/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.4.2/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.4.2/io-develop">日本語</a></li><li><a href="/docs/fr/2.4.2/io-develop">Français</a></li><li><a href="/docs/ko/2.4.2/io-develop">한국어</a></li><li><a href="/docs/zh-CN/2.4.2/io-develop">中文</a></li><li><a href="/docs/zh-TW/2.4.2/io-develop">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Pulsar IO</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/client-libraries">Use Pulsar with client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/concepts-schema-registry">Schema Registry</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-managing">Managing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-connectors">Builtin Connectors</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.4.2/io-develop">Developing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/io-cdc">CDC Connector</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/sql-getting-started">Get Started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/sql-deployment-configurations">Deployment and Configuration</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-dashboard">Dashboard</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/administration-upgrade">Upgrade</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-token-client">Client Authentication using tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-token-admin">Token authentication admin</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/security-extending">Extending</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/client-libraries-websocket">WebSocket</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-schemas">Schemas</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/admin-api-functions">Functions</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/reference-connector-admin">Connector Admin CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.2/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-develop.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Develop Connectors</h1></header><article><div><span><p>This guide describes how developers can write new connectors for Pulsar IO to move data
between Pulsar and other systems. It describes how to create a Pulsar IO connector.</p>
<p>Pulsar IO connectors are specialized <a href="/docs/en/2.4.2/functions-overview">Pulsar Functions</a>. So writing
a Pulsar IO connector is as simple as writing a Pulsar function. Pulsar IO connectors come
in two flavors: <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java"><code>Source</code></a>
,
which import data from another system, and <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java"><code>Sink</code></a>
,
which export data to another system. For example, <a href="/docs/en/2.4.2/io-kinesis">KinesisSink</a> would export
the messages of a Pulsar topic to a Kinesis stream, and <a href="/docs/en/2.4.2/io-rabbitmq">RabbitmqSource</a> would import
the messages of a RabbitMQ queue to a Pulsar topic.</p>
<h3><a class="anchor" aria-hidden="true" id="developing"></a><a href="#developing" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Developing</h3>
<h4><a class="anchor" aria-hidden="true" id="develop-a-source-connector"></a><a href="#develop-a-source-connector" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Develop a source connector</h4>
<p>What you need to develop a source connector is to implement <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java"><code>Source</code></a>
interface.</p>
<p>First, you need to implement the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L33"><code>open</code></a>
method. This method will be called once when the source connector
is initialized. In this method, you can retrieve all the connector specific settings through
the passed <code>config</code> parameter, and initialize all the necessary resourcess. For example, a Kafka
connector can create the Kafka client in this <code>open</code> method.</p>
<p>Beside the passed-in <code>config</code> object, the Pulsar runtime also provides a <code>SourceContext</code> for the
connector to access runtime resources for tasks like collecting metrics. The implementation can
save the <code>SourceContext</code> for further usage.</p>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**
* Open connector with configuration
*
* <span class="hljs-doctag">@param</span> config initialization config
* <span class="hljs-doctag">@param</span> sourceContext
* <span class="hljs-doctag">@throws</span> Exception IO type exceptions when opening a connector
*/</span>
<span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">open</span><span class="hljs-params">(<span class="hljs-keyword">final</span> Map&lt;String, Object&gt; config, SourceContext sourceContext)</span> <span class="hljs-keyword">throws</span> Exception</span>;
</code></pre>
<p>The main task for a Source implementor is to implement <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java#L41"><code>read</code></a>
method.</p>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**
* Reads the next message from source.
* If source does not have any new messages, this call should block.
* <span class="hljs-doctag">@return</span> next message from source. The return result should never be null
* <span class="hljs-doctag">@throws</span> Exception
*/</span>
<span class="hljs-function">Record&lt;T&gt; <span class="hljs-title">read</span><span class="hljs-params">()</span> <span class="hljs-keyword">throws</span> Exception</span>;
</code></pre>
<p>The implementation should be blocking on this method if nothing to return. It should never return
<code>null</code>. The returned <a href="https://github.com/apache/pulsar/tree/master//pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java#L28"><code>Record</code></a>
should encapsulates the information that is needed by
Pulsar IO runtime.</p>
<p>These information includes:</p>
<ul>
<li><em>Topic Name</em>: <em>Optional</em>. If the record is originated from a Pulsar topic, it should be the Pulsar topic name.</li>
<li><em>Key</em>: <em>Optional</em>. If the record has a key associated with it.</li>
<li><em>Value</em>: <em>Required</em>. The actual data of this record.</li>
<li><em>Partition Id</em>: <em>Optional</em>. If the record is originated from a partitioned source,
return its partition id. The partition id will be used as part of the unique identifier
by Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee.</li>
<li><em>Record Sequence</em>: <em>Optional</em>. If the record is originated from a sequential source,
return its record sequence. The record sequence will be used as part of the unique identifier
by Pulsar IO runtime to do message deduplication and achieve exactly-once processing guarantee.</li>
<li><em>Properties</em>: <em>Optional</em>. If the record carries user-defined properties, return those properties.</li>
</ul>
<p>Additionally, the implementation of the record should provide two methods: <code>ack</code> and <code>fail</code>. These
two methods will be used by Pulsar IO connector to acknowledge the records that it has done
processing and fail the records that it has failed to process.</p>
<p><a href="https://github.com/apache/pulsar/tree/master//pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java"><code>KafkaSource</code></a>
is a good example to follow.</p>
<h4><a class="anchor" aria-hidden="true" id="develop-a-sink-connector"></a><a href="#develop-a-sink-connector" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Develop a sink connector</h4>
<p>Developing a sink connector is as easy as developing a source connector. You just need to
implement <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java"><code>Sink</code></a>
interface.</p>
<p>Similarly, you first need to implement the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L36"><code>open</code></a>
method to initialize all the necessary resources
before implementing the <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L44"><code>write</code></a>
method.</p>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**
* Open connector with configuration
*
* <span class="hljs-doctag">@param</span> config initialization config
* <span class="hljs-doctag">@param</span> sinkContext
* <span class="hljs-doctag">@throws</span> Exception IO type exceptions when opening a connector
*/</span>
<span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">open</span><span class="hljs-params">(<span class="hljs-keyword">final</span> Map&lt;String, Object&gt; config, SinkContext sinkContext)</span> <span class="hljs-keyword">throws</span> Exception</span>;
</code></pre>
<p>The main task for a Sink implementor is to implement <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java#L44"><code>write</code></a>
method.</p>
<pre><code class="hljs css language-java"> <span class="hljs-comment">/**
* Write a message to Sink
* <span class="hljs-doctag">@param</span> record record to write to sink
* <span class="hljs-doctag">@throws</span> Exception
*/</span>
<span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">write</span><span class="hljs-params">(Record&lt;T&gt; record)</span> <span class="hljs-keyword">throws</span> Exception</span>;
</code></pre>
<p>In the implementation of <code>write</code> method, the implementor can decide how to write the value and
the optional key to the actual source, and leverage all the provided information such as
<code>Partition Id</code>, <code>Record Sequence</code> for achieving different processing guarantees. The implementor
is also responsible for acknowledging records if it has successfully written them or failing
records if has failed to write them.</p>
<h3><a class="anchor" aria-hidden="true" id="testing"></a><a href="#testing" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Testing</h3>
<p>Testing connectors can be challenging because Pulsar IO connectors interact with two systems
that may be difficult to mock - Pulsar and the system the connector is connecting to. It is
recommended to write very specifically test the functionalities of the connector classes
while mocking the external services.</p>
<p>Once you have written sufficient unit tests for your connector, we also recommend adding
separate integration tests to verify end-to-end functionality. In Pulsar, we are using
<a href="https://www.testcontainers.org/">testcontainers</a> for all Pulsar integration tests. Pulsar IO
<a href="https://github.com/apache/pulsar/tree/master//tests/integration/src/test/java/org/apache/pulsar/tests/integration/io"><code>IntegrationTests</code></a>
are good examples to follow on integration testing your connectors.</p>
<h3><a class="anchor" aria-hidden="true" id="packaging"></a><a href="#packaging" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Packaging</h3>
<p>Once you've developed and tested your connector, you must package it so that it can be submitted
to a <a href="/docs/en/2.4.2/functions-overview">Pulsar Functions</a> cluster. There are two approaches described
here work with Pulsar Functions' runtime.</p>
<p>If you plan to package and distribute your connector for others to use, you are obligated to
properly license and copyright your own code and to adhere to the licensing and copyrights of
all libraries your code uses and that you include in your distribution. If you are using the
approach described in <a href="#creating-a-nar-package">&quot;Creating a NAR package&quot;</a>, the NAR plugin will
automatically create a <code>DEPENDENCIES</code> file in the generated NAR package, including the proper
licensing and copyrights of all libraries of your connector.</p>
<h4><a class="anchor" aria-hidden="true" id="creating-a-nar-package"></a><a href="#creating-a-nar-package" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Creating a NAR package</h4>
<p>The easiest approach to packaging a Pulsar IO connector is to create a NAR package using
<a href="https://mvnrepository.com/artifact/org.apache.nifi/nifi-nar-maven-plugin">nifi-nar-maven-plugin</a>.</p>
<p>NAR stands for NiFi Archive. It is a custom packaging mechanism used by Apache NiFi, to provide
a bit of Java ClassLoader isolation. For more details, you can read this
<a href="https://medium.com/hashmapinc/nifi-nar-files-explained-14113f7796fd">blog post</a> to understand
how NAR works. Pulsar uses the same mechanism for packaging all the <a href="io-connectors">builtin connectors</a>.</p>
<p>All what you need is to include this <a href="https://mvnrepository.com/artifact/org.apache.nifi/nifi-nar-maven-plugin">nifi-nar-maven-plugin</a> in your maven project for your connector. For example:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">plugins</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">plugin</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.nifi<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>nifi-nar-maven-plugin<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>1.2.0<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">plugin</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">plugins</span>&gt;</span>
</code></pre>
<p>The <a href="https://github.com/apache/pulsar/tree/master//pulsar-io/twitter"><code>TwitterFirehose</code></a>
connector is a good example to follow.</p>
<h4><a class="anchor" aria-hidden="true" id="creating-an-uber-jar"></a><a href="#creating-an-uber-jar" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Creating an Uber JAR</h4>
<p>An alternative approach is to create an <em>uber JAR</em> that contains all of the connector's JAR files
and other resource files. No directory internal structure is necessary.</p>
<p>You can use <a href="https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html">maven-shade-plugin</a> to create a Uber JAR. For example:</p>
<pre><code class="hljs css language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">plugin</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.apache.maven.plugins<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>maven-shade-plugin<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>3.1.1<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">executions</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">execution</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">phase</span>&gt;</span>package<span class="hljs-tag">&lt;/<span class="hljs-name">phase</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">goals</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">goal</span>&gt;</span>shade<span class="hljs-tag">&lt;/<span class="hljs-name">goal</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">goals</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">configuration</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">filters</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">filter</span>&gt;</span>
<span class="hljs-tag">&lt;<span class="hljs-name">artifact</span>&gt;</span>*:*<span class="hljs-tag">&lt;/<span class="hljs-name">artifact</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">filter</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">filters</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">configuration</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">execution</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">executions</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">plugin</span>&gt;</span>
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.4.2/io-connectors"><span class="arrow-prev"></span><span>Built-in connector</span></a><a class="docs-next button" href="/docs/en/2.4.2/io-cdc"><span>CDC connector</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>