blob: d5d8dd4e08adb993d28592316ca7f09eeefb4f22 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Tutorial: Connect Pulsar with Database · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code. "/><meta name="docsearch:version" content="2.4.0"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Tutorial: Connect Pulsar with Database · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code. "/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.4.0</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.4.0/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.4.0/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.4.0/io-quickstart">日本語</a></li><li><a href="/docs/fr/2.4.0/io-quickstart">Français</a></li><li><a href="/docs/ko/2.4.0/io-quickstart">한국어</a></li><li><a href="/docs/zh-CN/2.4.0/io-quickstart">中文</a></li><li><a href="/docs/zh-TW/2.4.0/io-quickstart">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Pulsar IO</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/client-libraries">Use Pulsar with client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/concepts-schema-registry">Schema Registry</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/functions-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/functions-api">API</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/functions-deploying">Deploying functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/functions-guarantees">Processing guarantees</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/functions-state">State Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/functions-metrics">Metrics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/functions-worker">Admin: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/functions-runtime">Admin: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/functions-debugging">Debugging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/io-overview">Overview</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.4.0/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/io-managing">Managing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/io-connectors">Builtin Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/io-develop">Developing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/io-cdc">CDC Connector</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/sql-getting-started">Get Started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/sql-deployment-configurations">Deployment and Configuration</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/administration-dashboard">Dashboard</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/administration-proxy">Pulsar proxy</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/security-token-client">Client Authentication using tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/security-token-admin">Token authentication admin</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/security-extending">Extending</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/client-libraries-websocket">WebSocket</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/admin-api-schemas">Schemas</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.4.0/reference-configuration">Pulsar configuration</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-quickstart.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Tutorial: Connect Pulsar with Database</h1></header><article><div><span><p>This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code.</p>
<p>It is helpful to review the <a href="/docs/en/2.4.0/io-overview">concepts</a> for Pulsar I/O with running the steps in this guide to gain a deeper understanding.</p>
<p>At the end of this tutorial, you will be able to:</p>
<ul>
<li><p><a href="#Connect-Pulsar-to-Apache-Cassandra">Connect Pulsar to Apache Cassandra</a></p></li>
<li><p><a href="#Connect-Pulsar-to-MySQL">Connect Pulsar to MySQL</a></p></li>
</ul>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip"></a><a href="#tip" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<ul>
<li><p>These instructions assume you are running Pulsar in <a href="/docs/en/2.4.0/getting-started-standalone">standalone mode</a>. However all
the commands used in this tutorial should be able to be used in a multi-nodes Pulsar cluster without any changes.</p></li>
<li><p>All the instructions are assumed to run at the root directory of a Pulsar binary distribution.</p></li>
</ul>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="install-pulsar-and-builtin-connector"></a><a href="#install-pulsar-and-builtin-connector" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Install Pulsar and builtin connector</h2>
<p>Before connecting Pulsar to a database, we need to install Pulsar and the desired builtin connector.</p>
<p>For more information about how to install a standalone Pulsar and builtin connectors, see <a href="/docs/en/2.4.0/getting-started-standalone/#installing-pulsar">here</a>.</p>
<h2><a class="anchor" aria-hidden="true" id="start-a-standalone-pulsar"></a><a href="#start-a-standalone-pulsar" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Start a standalone Pulsar</h2>
<ol>
<li><p>Start Pulsar locally.</p>
<pre><code class="hljs css language-bash">bin/pulsar standalone
</code></pre>
<p>All the components of a Pulsar service will start in order. You can curl those pulsar service endpoints to make sure Pulsar service is up running correctly.</p></li>
<li><p>Check Pulsar binary protocol port.</p>
<pre><code class="hljs css language-bash">telnet localhost 6650
</code></pre></li>
<li><p>Check Pulsar Function cluster.</p>
<pre><code class="hljs css language-bash">curl -s http://localhost:8080/admin/v2/worker/cluster
</code></pre>
<p>Example output:</p>
<pre><code class="hljs css language-shell">[{"workerId":"c-standalone-fw-localhost-6750","workerHostname":"localhost","port":6750}]
</code></pre></li>
<li><p>Make sure public tenant and default namespace exist.</p>
<pre><code class="hljs css language-bash">curl -s http://localhost:8080/admin/v2/namespaces/public
</code></pre>
<p>Example outoupt:</p>
<pre><code class="hljs css language-shell">["public/default","public/functions"]
</code></pre></li>
<li><p>All builtin connectors should be listed as available.</p>
<pre><code class="hljs css language-bash">curl -s http://localhost:8080/admin/v2/<span class="hljs-built_in">functions</span>/connectors
</code></pre>
<p>Example output:</p>
<pre><code class="hljs css language-json">[{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"aerospike"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Aerospike database sink"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.aerospike.AerospikeStringSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"cassandra"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Writes data into Cassandra"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.cassandra.CassandraStringSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"kafka"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Kafka source and sink connector"</span>,<span class="hljs-attr">"sourceClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.kafka.KafkaStringSource"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.kafka.KafkaBytesSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"kinesis"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Kinesis sink connector"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.kinesis.KinesisSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"rabbitmq"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"RabbitMQ source connector"</span>,<span class="hljs-attr">"sourceClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.rabbitmq.RabbitMQSource"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"twitter"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Ingest data from Twitter firehose"</span>,<span class="hljs-attr">"sourceClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.twitter.TwitterFireHose"</span>}]
</code></pre>
<p>If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running <code>pulsar/standalone</code>,
or you can navigate the <code>logs</code> directory under the Pulsar directory to view the logs.</p></li>
</ol>
<h2><a class="anchor" aria-hidden="true" id="connect-pulsar-to-apache-cassandra"></a><a href="#connect-pulsar-to-apache-cassandra" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Connect Pulsar to Apache Cassandra</h2>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-1"></a><a href="#tip-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>Make sure you have docker available at your computer. If you don't have docker installed, follow the instructions <a href="https://docs.docker.com/docker-for-mac/install/">here</a>.</p>
</blockquote>
<p>We are using <code>cassandra</code> docker image to start a single-node cassandra cluster in Docker.</p>
<h3><a class="anchor" aria-hidden="true" id="setup-the-cassandra-cluster"></a><a href="#setup-the-cassandra-cluster" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Setup the Cassandra cluster</h3>
<h4><a class="anchor" aria-hidden="true" id="start-a-cassandra-cluster"></a><a href="#start-a-cassandra-cluster" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Start a Cassandra cluster</h4>
<pre><code class="hljs css language-bash">docker run -d --rm --name=cassandra -p 9042:9042 cassandra
</code></pre>
<p>Before moving to next steps, make sure the cassandra cluster is up running.</p>
<ol>
<li>Make sure the docker process is running.</li>
</ol>
<pre><code class="hljs css language-bash">docker ps
</code></pre>
<ol start="2">
<li>Check the cassandra logs to make sure cassandra process is running as expected.</li>
</ol>
<pre><code class="hljs css language-bash">docker logs cassandra
</code></pre>
<ol start="3">
<li>Check the cluster status</li>
</ol>
<pre><code class="hljs css language-bash">docker <span class="hljs-built_in">exec</span> cassandra nodetool status
</code></pre>
<p>Example output:</p>
<pre><code class="hljs">Datacenter: datacenter1
=======================
<span class="hljs-attribute">Status</span>=Up/Down
|/ <span class="hljs-attribute">State</span>=Normal/Leaving/Joining/Moving
-- <span class="hljs-built_in"> Address </span> Load Tokens Owns (effective) Host ID Rack
UN 172.17.0.2 103.67 KiB 256 100.0% af0e4b2f-84e0-4f0b-bb14-bd5f9070ff26 rack1
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="create-keyspace-and-table"></a><a href="#create-keyspace-and-table" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create keyspace and table</h4>
<p>We are using <code>cqlsh</code> to connect to the cassandra cluster to create keyspace and table.</p>
<pre><code class="hljs css language-bash">$ docker <span class="hljs-built_in">exec</span> -ti cassandra cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Use HELP <span class="hljs-keyword">for</span> <span class="hljs-built_in">help</span>.
cqlsh&gt;
</code></pre>
<p>All the following commands are executed in <code>cqlsh</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="create-keyspace-pulsar_test_keyspace"></a><a href="#create-keyspace-pulsar_test_keyspace" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create keyspace <code>pulsar_test_keyspace</code></h4>
<pre><code class="hljs css language-bash">cqlsh&gt; CREATE KEYSPACE pulsar_test_keyspace WITH replication = {<span class="hljs-string">'class'</span>:<span class="hljs-string">'SimpleStrategy'</span>, <span class="hljs-string">'replication_factor'</span>:1};
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="create-table-pulsar_test_table"></a><a href="#create-table-pulsar_test_table" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create table <code>pulsar_test_table</code></h4>
<pre><code class="hljs css language-bash">cqlsh&gt; USE pulsar_test_keyspace;
cqlsh:pulsar_test_keyspace&gt; CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="configure-a-cassandra-sink"></a><a href="#configure-a-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure a Cassandra sink</h3>
<p>Now that we have a Cassandra cluster running locally. In this section, we will configure a Cassandra sink connector.
The Cassandra sink connector will read messages from a Pulsar topic and write the messages into a Cassandra table.</p>
<p>In order to run a Cassandra sink connector, you need to prepare a yaml config file including information that Pulsar IO
runtime needs to know. For example, how Pulsar IO can find the cassandra cluster, what is the keyspace and table that
Pulsar IO will be using for writing Pulsar messages to.</p>
<p>Create a file <code>examples/cassandra-sink.yml</code> and edit it to fill in following content:</p>
<pre><code class="hljs"><span class="hljs-symbol">configs:</span>
<span class="hljs-symbol"> roots:</span> <span class="hljs-string">"localhost:9042"</span>
<span class="hljs-symbol"> keyspace:</span> <span class="hljs-string">"pulsar_test_keyspace"</span>
<span class="hljs-symbol"> columnFamily:</span> <span class="hljs-string">"pulsar_test_table"</span>
<span class="hljs-symbol"> keyname:</span> <span class="hljs-string">"key"</span>
<span class="hljs-symbol"> columnName:</span> <span class="hljs-string">"col"</span>
</code></pre>
<p>To learn more about Cassandra Connector, see <a href="/docs/en/2.4.0/io-cassandra">Cassandra Connector</a>.</p>
<h3><a class="anchor" aria-hidden="true" id="submit-a-cassandra-sink"></a><a href="#submit-a-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Submit a Cassandra sink</h3>
<p>Pulsar provides the <a href="/docs/en/2.4.0/reference-cli-tools">CLI</a> for running and managing Pulsar I/O connectors.</p>
<p>We can run following command to sink a sink connector with type <code>cassandra</code> and config file <code>examples/cassandra-sink.yml</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="note"></a><a href="#note" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Note</h4>
<blockquote>
<p>The <code>sink-type</code> parameter of the currently built-in connectors is determined by the setting of the <code>name</code> parameter specified in the pulsar-io.yaml file.</p>
</blockquote>
<pre><code class="hljs css language-shell">bin/pulsar-admin sinks create \
--tenant public \
--namespace default \
--name cassandra-test-sink \
--sink-type cassandra \
--sink-config-file examples/cassandra-sink.yml \
--inputs test_cassandra
</code></pre>
<p>Once the command is executed, Pulsar will create a sink connector named <code>cassandra-test-sink</code> and the sink connector will be running
as a Pulsar Function and write the messages produced in topic <code>test_cassandra</code> to Cassandra table <code>pulsar_test_table</code>.</p>
<h3><a class="anchor" aria-hidden="true" id="inspect-the-cassandra-sink"></a><a href="#inspect-the-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Inspect the Cassandra sink</h3>
<p>You can use <a href="/docs/en/2.4.0/reference-pulsar-admin#sink">sink CLI</a> and <a href="/docs/en/2.4.0/reference-pulsar-admin#source">source CLI</a>
for inspecting and managing the IO connectors.</p>
<h4><a class="anchor" aria-hidden="true" id="retrieve-sink-info"></a><a href="#retrieve-sink-info" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Retrieve Sink Info</h4>
<pre><code class="hljs css language-bash">bin/pulsar-admin sinks get \
--tenant public \
--namespace default \
--name cassandra-test-sink
</code></pre>
<p>Example output:</p>
<pre><code class="hljs css language-shell">{
"tenant": "public",
"namespace": "default",
"name": "cassandra-test-sink",
"className": "org.apache.pulsar.io.cassandra.CassandraStringSink",
"inputSpecs": {
"test_cassandra": {
"isRegexPattern": false
}
},
"configs": {
"roots": "localhost:9042",
"keyspace": "pulsar_test_keyspace",
"columnFamily": "pulsar_test_table",
"keyname": "key",
"columnName": "col"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true,
"archive": "builtin://cassandra"
}
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="check-sink-running-status"></a><a href="#check-sink-running-status" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Check Sink running status</h4>
<pre><code class="hljs css language-bash">bin/pulsar-admin sinks status \
--tenant public \
--namespace default \
--name cassandra-test-sink
</code></pre>
<p>Example output:</p>
<pre><code class="hljs css language-shell">{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReadFromPulsar" : 0,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSinkExceptions" : 0,
"latestSinkExceptions" : [ ],
"numWrittenToSink" : 0,
"lastReceivedTime" : 0,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="verify-the-cassandra-sink"></a><a href="#verify-the-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Verify the Cassandra sink</h3>
<p>Now lets produce some messages to the input topic of the Cassandra sink <code>test_cassandra</code>.</p>
<pre><code class="hljs css language-bash"><span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> {0..9}; <span class="hljs-keyword">do</span> bin/pulsar-client produce -m <span class="hljs-string">"key-<span class="hljs-variable">$i</span>"</span> -n 1 test_cassandra; <span class="hljs-keyword">done</span>
</code></pre>
<p>Inspect the sink running status again. You should be able to see 10 messages are processed by the Cassandra sink.</p>
<pre><code class="hljs css language-bash">bin/pulsar-admin sinks status \
--tenant public \
--namespace default \
--name cassandra-test-sink
</code></pre>
<p>Example output:</p>
<pre><code class="hljs css language-shell">{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReadFromPulsar" : 10,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSinkExceptions" : 0,
"latestSinkExceptions" : [ ],
"numWrittenToSink" : 10,
"lastReceivedTime" : 1551685489136,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
}
</code></pre>
<p>Finally, lets inspect the results in Cassandra using <code>cqlsh</code></p>
<pre><code class="hljs css language-bash">docker <span class="hljs-built_in">exec</span> -ti cassandra cqlsh localhost
</code></pre>
<p>Select the rows from the Cassandra table <code>pulsar_test_table</code>:</p>
<pre><code class="hljs css language-bash">cqlsh&gt; use pulsar_test_keyspace;
cqlsh:pulsar_test_keyspace&gt; select * from pulsar_test_table;
key | col
--------+--------
key-5 | key-5
key-0 | key-0
key-9 | key-9
key-2 | key-2
key-1 | key-1
key-3 | key-3
key-6 | key-6
key-7 | key-7
key-4 | key-4
key-8 | key-8
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="delete-the-cassandra-sink"></a><a href="#delete-the-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Delete the Cassandra Sink</h3>
<pre><code class="hljs css language-shell">bin/pulsar-admin sinks delete \
--tenant public \
--namespace default \
--name cassandra-test-sink
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="connect-pulsar-to-mysql"></a><a href="#connect-pulsar-to-mysql" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Connect Pulsar to MySQL</h2>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-2"></a><a href="#tip-2" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>Make sure you have Docker available at your computer. If you don't have Docker installed, follow the instructions <a href="https://docs.docker.com/docker-for-mac/install/">here</a>.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="setup-a-mysql-cluster"></a><a href="#setup-a-mysql-cluster" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Setup a MySQL cluster</h3>
<p>Use the MySQL 5.7 docker image to start a single-node MySQL cluster in Docker.</p>
<ol>
<li><p>Pull the MySQL 5.7 image from Docker Hub.</p>
<pre><code class="hljs css language-text">$ docker pull mysql:5.7
</code></pre></li>
<li><p>Start MySQL.</p>
<pre><code class="hljs css language-text">$ docker run -d -it --rm \
--name pulsar-mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=jdbc \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw \
mysql:5.7
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-3"></a><a href="#tip-3" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<table>
<thead>
<tr><th>Flag</th><th>Description</th><th>This example</th></tr>
</thead>
<tbody>
<tr><td><code>-d</code></td><td>To start a container in detached mode.</td><td>/</td></tr>
<tr><td><code>-it</code></td><td>Keep STDIN open even if not attached and allocate a terminal.</td><td>/</td></tr>
<tr><td><code>--rm</code></td><td>Remove the container automatically when it exits.</td><td>/</td></tr>
<tr><td><code>-name</code></td><td>Assign a name to the container.</td><td>This example specifies <em>pulsar-mysql</em> for the container.</td></tr>
<tr><td><code>-p</code></td><td>Publish the port of the container to the host.</td><td>This example publishes the port <em>3306</em> of the container to the host.</td></tr>
<tr><td><code>-e</code></td><td>Set environment variables.</td><td>This example sets the following variables:<br>- The password for the root user is <em>jdbc</em>. <br>- The name for the normal user is <em>mysqluser</em>. <br>- The password for the normal user is <em>mysqlpw</em>.</td></tr>
</tbody>
</table>
<p>For more information about Docker command, see <a href="https://docs.docker.com/engine/reference/commandline/run/">here</a>.</p>
</blockquote></li>
<li><p>Check if MySQL has been started successfully.</p>
<pre><code class="hljs css language-text">$ docker logs -f pulsar-mysql
</code></pre>
<p>MySQL has been started successfully if the following message appears.</p>
<pre><code class="hljs css language-text">2019-05-11T10:40:58.709964Z 0 [Note] Found ca.pem, server-cert.pem and server-key.pem in data directory. Trying to enable SSL support using them.
2019-05-11T10:40:58.710155Z 0 [Warning] CA certificate ca.pem is self signed.
2019-05-11T10:40:58.711921Z 0 [Note] Server hostname (bind-address): '*'; port: 3306
2019-05-11T10:40:58.711985Z 0 [Note] IPv6 is available.
2019-05-11T10:40:58.712695Z 0 [Note] - '::' resolves to '::';
2019-05-11T10:40:58.712742Z 0 [Note] Server socket created on IP: '::'.
2019-05-11T10:40:58.714334Z 0 [Warning] Insecure configuration for --pid-file: Location '/var/run/mysqld' in the path is accessible to all OS users. Consider choosing a different directory.
2019-05-11T10:40:58.723802Z 0 [Note] Event Scheduler: Loaded 0 events
2019-05-11T10:40:58.724200Z 0 [Note] mysqld: ready for connections.
Version: '5.7.26' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server (GPL)
</code></pre></li>
<li><p>Access to MySQL.</p>
<pre><code class="hljs css language-text">$ docker exec -it pulsar-mysql /bin/bash
mysql -h localhost -uroot -pjdbc
</code></pre></li>
<li><p>Create a <em>pulsar_mysql_jdbc_sink</em> table.</p>
<pre><code class="hljs css language-text">$ create database pulsar_mysql_jdbc_sink;
$ use pulsar_mysql_jdbc_sink;
$ create table if not exists pulsar_mysql_jdbc_sink
(
id INT AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
primary key (id)
)
engine=innodb;
</code></pre></li>
</ol>
<h3><a class="anchor" aria-hidden="true" id="configure-a-jdbc-sink"></a><a href="#configure-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure a JDBC sink</h3>
<p>Now that we have a MySQL running locally. In this section, we will configure a JDBC sink connector. The JDBC sink connector will read messages from a Pulsar topic and write messages into a MySQL table.</p>
<ol>
<li><p>Add a configuration file.</p>
<p>To run a JDBC sink connector, you need to prepare a yaml config file including the information that Pulsar IO runtime needs to know. For example, how Pulsar IO can find the MySQL cluster, what is the JDBCURL and the table that Pulsar IO will use for writing messages to.</p>
<p>Create a <em>pulsar-mysql-jdbc-sink.yaml</em> file, copy the following contents to this file, and place the file in the <code>pulsar/connectors</code> folder.</p>
<pre><code class="hljs css language-text">configs:
userName: "root"
password: "jdbc"
jdbcUrl: "jdbc:mysql://127.0.0.1:3306/pulsar_mysql_jdbc_sink"
tableName: "pulsar_mysql_jdbc_sink"
</code></pre></li>
<li><p>Create a schema.</p>
<p>Create a <em>avro-schema</em> file, copy the following contents to this file, and place the file in the <code>pulsar/connectors</code> folder.</p>
<pre><code class="hljs css language-text">{
"type": "AVRO",
"schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
"properties": {}
}
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-4"></a><a href="#tip-4" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about AVRO, see <a href="https://avro.apache.org/docs/1.8.2/">Apache Avro Documentation</a>.</p>
</blockquote></li>
</ol>
<ol start="3">
<li><p>Upload a schema to a topic.</p>
<p>This example uploads the <em>avro-schema</em> schema to the <em>pulsar-mysql-jdbc-sink-topic</em> topic.</p>
<pre><code class="hljs css language-text">$ bin/pulsar-admin schemas upload pulsar-mysql-jdbc-sink-topic -f ./connectors/avro-schema
</code></pre></li>
<li><p>Check if the schema has been uploaded successfully.</p>
<pre><code class="hljs css language-text">$ bin/pulsar-admin schemas get pulsar-mysql-jdbc-sink-topic
</code></pre>
<p>The schema has been uploaded successfully if the following message appears.</p>
<pre><code class="hljs css language-text">{"name":"pulsar-mysql-jdbc-sink-topic","schema":"{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}","type":"AVRO","properties":{}}
</code></pre></li>
</ol>
<h3><a class="anchor" aria-hidden="true" id="submit-a-jdbc-sink"></a><a href="#submit-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Submit a JDBC sink</h3>
<p>Pulsar provides the <a href="/docs/en/2.4.0/admin-api-overview">CLI</a> for running and managing Pulsar I/O connectors.</p>
<p>This example creates a sink connector and specifies the desired information.</p>
<pre><code class="hljs css language-text">$ bin/pulsar-admin sinks create \
--archive ./connectors/pulsar-io-jdbc-2.4.0.nar \
--inputs pulsar-mysql-jdbc-sink-topic \
--name pulsar-mysql-jdbc-sink \
--sink-config-file ./connectors/pulsar-mysql-jdbc-sink.yaml \
--parallelism 1
</code></pre>
<p>Once the command is executed, Pulsar will create a sink connector named <em>pulsar-mysql-jdbc-sink</em>, and the sink connector will be running as a Pulsar Function and write the messages produced in the <em>pulsar-mysql-jdbc-sink-topic</em> topic to the MySQL <em>pulsar_mysql_jdbc_sink</em> table.</p>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-5"></a><a href="#tip-5" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<table>
<thead>
<tr><th>Flag</th><th>Description</th><th>This example</th></tr>
</thead>
<tbody>
<tr><td><code>--archive</code></td><td>Path to the archive file for the sink.</td><td><em>pulsar-io-jdbc-2.4.0.nar</em></td></tr>
<tr><td><code>--inputs</code></td><td>The input topic or topics of the sink. <br> (Multiple topics can be specified as a comma-separated list.)</td></tr>
<tr><td><code>--name</code></td><td>The name of the sink.</td><td><em>pulsar-mysql-jdbc-sink</em></td></tr>
<tr><td><code>--sink-config-file</code></td><td>The path to a YAML config file specifying the configuration of the sink.</td><td><em>pulsar-mysql-jdbc-sink.yaml</em></td></tr>
<tr><td><code>--parallelism</code></td><td>The parallelism factor of the sink. <br> For example, the number of sink instances to run.</td><td><em>1</em></td></tr>
</tbody>
</table>
<p>For more information about <code>pulsar-admin sinks create options</code>, see <a href="/docs/en/2.4.0/reference-pulsar-admin#create-3">here</a>.</p>
</blockquote>
<p>The sink has been created successfully if the following message appears.</p>
<pre><code class="hljs css language-text">"Created successfully"
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="inspect-a-jdbc-sink"></a><a href="#inspect-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Inspect a JDBC sink</h3>
<h4><a class="anchor" aria-hidden="true" id="list-all-running-jdbc-sinks"></a><a href="#list-all-running-jdbc-sinks" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>List all running JDBC sink(s)</h4>
<p>This example lists all running sink connectors.</p>
<pre><code class="hljs css language-text">$ bin/pulsar-admin sinks list \
--tenant public \
--namespace default
</code></pre>
<p>The result shows that only the <em>mysql-jdbc-sink</em> sink is running.</p>
<pre><code class="hljs css language-text">[
"pulsar-mysql-jdbc-sink"
]
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="get-information-of-a-jdbc-sink"></a><a href="#get-information-of-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Get information of a JDBC sink</h4>
<p>This example gets the information about the <em>pulsar-mysql-jdbc-sink</em> sink connector.</p>
<pre><code class="hljs css language-text">$ bin/pulsar-admin sinks get \
--tenant public \
--namespace default \
--name pulsar-mysql-jdbc-sink
</code></pre>
<p>The result show the information of the sink connector, including tenant, namespace, topic and so on.</p>
<pre><code class="hljs css language-text">{
"tenant": "public",
"namespace": "default",
"name": "pulsar-mysql-jdbc-sink",
"className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink",
"inputSpecs": {
"pulsar-mysql-jdbc-sink-topic": {
"isRegexPattern": false
}
},
"configs": {
"password": "jdbc",
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/pulsar_mysql_jdbc_sink",
"userName": "root",
"tableName": "pulsar_mysql_jdbc_sink"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true
}
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="get-status-of-a-jdbc-sink"></a><a href="#get-status-of-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Get status of a JDBC sink</h4>
<p>This example checks the current status of the <em>pulsar-mysql-jdbc-sink</em> sink connector.</p>
<pre><code class="hljs css language-text">$ bin/pulsar-admin sinks status \
--tenant public \
--namespace default \
--name pulsar-mysql-jdbc-sink
</code></pre>
<p>The result shows the current status of sink connector, including the number of instance, running status, worker ID and so on.</p>
<pre><code class="hljs css language-text">{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReadFromPulsar" : 0,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSinkExceptions" : 0,
"latestSinkExceptions" : [ ],
"numWrittenToSink" : 0,
"lastReceivedTime" : 0,
"workerId" : "c-standalone-fw-192.168.2.52-8080"
}
} ]
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="stop-a-jdbc-sink"></a><a href="#stop-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Stop a JDBC sink</h3>
<p>This example stops the <em>pulsar-mysql-jdbc-sink</em> sink instance.</p>
<pre><code class="hljs css language-text">$ bin/pulsar-admin sinks stop \
--tenant public \
--namespace default \
--name pulsar-mysql-jdbc-sink \
--instance-id 0
</code></pre>
<p>The sink instance has been stopped successfully if the following message disappears.</p>
<pre><code class="hljs css language-text">"Stopped successfully"
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="restart-a-jdbc-sink"></a><a href="#restart-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Restart a JDBC sink</h3>
<p>This example starts the <em>pulsar-mysql-jdbc-sink</em> sink instance.</p>
<pre><code class="hljs css language-text">$ bin/pulsar-admin sinks start \
--tenant public \
--namespace default \
--name pulsar-mysql-jdbc-sink \
--instance-id 0
</code></pre>
<p>The sink instance has been started successfully if the following message disappears.</p>
<pre><code class="hljs css language-text">"Started successfully"
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-6"></a><a href="#tip-6" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>Optionally, you can run a standalone sink connector using <code>pulsar-admin sinks localrun options</code>.</p>
<p>Note that <code>pulsar-admin sinks localrun options</code> runs a sink connector locally, while <code>pulsar-admin sinks start options</code> starts a sink connector in a cluster.</p>
<p>For more information about <code>pulsar-admin sinks localrun options</code>, see <a href="/docs/en/2.4.0/reference-pulsar-admin#localrun-1">here</a>.</p>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="update-a-jdbc-sink"></a><a href="#update-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Update a JDBC sink</h3>
<p>This example updates the parallelism of the <em>pulsar-mysql-jdbc-sink</em> sink connector to 2.</p>
<pre><code class="hljs css language-text">$ bin/pulsar-admin sinks update \
--name pulsar-mysql-jdbc-sink \
--parallelism 2
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-7"></a><a href="#tip-7" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <code>pulsar-admin sinks update options</code>, see <a href="/docs/en/2.4.0/reference-pulsar-admin#update-2">here</a>.</p>
</blockquote>
<p>The sink connector has been updated successfully if the following message disappears.</p>
<pre><code class="hljs css language-text">"Updated successfully"
</code></pre>
<p>This example double-checks the information.</p>
<pre><code class="hljs css language-text">$ bin/pulsar-admin sinks get \
--tenant public \
--namespace default \
--name pulsar-mysql-jdbc-sink
</code></pre>
<p>The result shows that the parallelism is 2.</p>
<pre><code class="hljs css language-text">{
"tenant": "public",
"namespace": "default",
"name": "pulsar-mysql-jdbc-sink",
"className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink",
"inputSpecs": {
"pulsar-mysql-jdbc-sink-topic": {
"isRegexPattern": false
}
},
"configs": {
"password": "jdbc",
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/pulsar_mysql_jdbc_sink",
"userName": "root",
"tableName": "pulsar_mysql_jdbc_sink"
},
"parallelism": 2,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="delete-a-jdbc-sink"></a><a href="#delete-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Delete a JDBC sink</h3>
<p>This example deletes the <em>pulsar-mysql-jdbc-sink</em> sink connector.</p>
<pre><code class="hljs css language-text">$ bin/pulsar-admin sinks delete \
--tenant public \
--namespace default \
--name pulsar-mysql-jdbc-sink
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-8"></a><a href="#tip-8" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <code>pulsar-admin sinks delete options</code>, see <a href="/docs/en/2.4.0/reference-pulsar-admin#delete-4">here</a>.</p>
</blockquote>
<p>The sink connector has been deleted successfully if the following message appears.</p>
<pre><code class="hljs css language-text">"Deleted successfully"
</code></pre>
<p>This example double-checks the existence of the sink connector.</p>
<pre><code class="hljs css language-text">$ bin/pulsar-admin sinks get \
--tenant public \
--namespace default \
--name pulsar-mysql-jdbc-sink
</code></pre>
<p>The results shows that the sink connector does not exist.</p>
<pre><code class="hljs css language-text">HTTP 404 Not Found
Reason: Sink pulsar-mysql-jdbc-sink doesn't exist
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.4.0/io-overview"><span class="arrow-prev"></span><span>Overview</span></a><a class="docs-next button" href="/docs/en/2.4.0/io-managing"><span>Next</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#install-pulsar-and-builtin-connector">Install Pulsar and builtin connector</a></li><li><a href="#start-a-standalone-pulsar">Start a standalone Pulsar</a></li><li><a href="#connect-pulsar-to-apache-cassandra">Connect Pulsar to Apache Cassandra</a><ul class="toc-headings"><li><a href="#setup-the-cassandra-cluster">Setup the Cassandra cluster</a></li><li><a href="#configure-a-cassandra-sink">Configure a Cassandra sink</a></li><li><a href="#submit-a-cassandra-sink">Submit a Cassandra sink</a></li><li><a href="#inspect-the-cassandra-sink">Inspect the Cassandra sink</a></li><li><a href="#verify-the-cassandra-sink">Verify the Cassandra sink</a></li><li><a href="#delete-the-cassandra-sink">Delete the Cassandra Sink</a></li></ul></li><li><a href="#connect-pulsar-to-mysql">Connect Pulsar to MySQL</a><ul class="toc-headings"><li><a href="#setup-a-mysql-cluster">Setup a MySQL cluster</a></li><li><a href="#configure-a-jdbc-sink">Configure a JDBC sink</a></li><li><a href="#submit-a-jdbc-sink">Submit a JDBC sink</a></li><li><a href="#inspect-a-jdbc-sink">Inspect a JDBC sink</a></li><li><a href="#stop-a-jdbc-sink">Stop a JDBC sink</a></li><li><a href="#restart-a-jdbc-sink">Restart a JDBC sink</a></li><li><a href="#update-a-jdbc-sink">Update a JDBC sink</a></li><li><a href="#delete-a-jdbc-sink">Delete a JDBC sink</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>