blob: 5cb60939e595ec9a8f3ca4a8c8ae7bc30915916f [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>How to connect Pulsar to database · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code. "/><meta name="docsearch:version" content="next"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="How to connect Pulsar to database · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code. "/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>next</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/next/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/next/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/next/io-quickstart">日本語</a></li><li><a href="/docs/fr/next/io-quickstart">Français</a></li><li><a href="/docs/ko/next/io-quickstart">한국어</a></li><li><a href="/docs/zh-CN/next/io-quickstart">中文</a></li><li><a href="/docs/zh-TW/next/io-quickstart">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Pulsar IO</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Get Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/getting-started-docker">Run Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/getting-started-helm">Run Pulsar in Kubernetes</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-proxy-sni-routing">Proxy support with SNI routing</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/concepts-multiple-advertised-listeners">Multiple advertised listeners</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Schema</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/schema-get-started">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/schema-understand">Understand schema</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/schema-evolution-compatibility">Schema evolution and compatibility</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/schema-manage">Manage schema</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-runtime">Setup: Configure Functions runtime</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-worker">Setup: Pulsar Functions Worker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-develop">How-to: Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-package">How-to: Package</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-debug">How-to: Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-deploy">How-to: Deploy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/functions-cli">Reference: CLI</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/window-functions-context">Window Functions: Context</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/io-overview">Overview</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/next/io-quickstart">Get started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-use">Use</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-debug">Debug</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-connectors">Built-in connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-cdc">CDC connector</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-develop">Develop</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/io-cli">CLI</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/sql-getting-started">Query data</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/sql-deployment-configurations">Configuration and deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/sql-rest-api">REST APIs</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Tiered Storage</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/tiered-storage-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/tiered-storage-aws">AWS S3 offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/tiered-storage-gcs">GCS offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/tiered-storage-filesystem">Filesystem offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/tiered-storage-azure">Azure BlobStore offloader</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/tiered-storage-aliyun">Aliyun OSS offloader</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Transactions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/txn-why">Why transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/txn-what">What are transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/txn-how">How transactions work?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/txn-use">How to use transactions?</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/txn-monitor">How to monitor transactions?</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Kubernetes (Helm)</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/helm-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/helm-prepare">Prepare</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/helm-install">Install</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/helm-deploy">Deployment</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/helm-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/helm-tools">Required Tools</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/deploy-docker">Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/deploy-monitoring">Monitor</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-pulsar-manager">Pulsar Manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-load-balance">Load balance</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-proxy">Pulsar proxy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-upgrade">Upgrade</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/administration-isolation">Pulsar isolation</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-policy-and-supported-versions">Security Policy and Supported Versions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-tls-keystore">Using TLS with KeyStore configure</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-jwt">Authentication using JWT</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-kerberos">Authentication using Kerberos</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-oauth2">Authentication using OAuth 2.0 access tokens</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-extending">Extend Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/security-bouncy-castle">Bouncy Castle Providers</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Performance</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/performance-pulsar-perf">Pulsar Perf</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-node">Node.js</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-websocket">WebSocket</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-dotnet">C#</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/client-libraries-rest">REST</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-topics">Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-functions">Functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/admin-api-packages">Packages</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-message-queue">Message queue</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/cookbooks-bookkeepermetadata">BookKeeper Ledger Metadata</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/develop-plugin">Plugin</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/next/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/reference-configuration">Pulsar configuration</a></li><li class="navListItem"><a class="navItem" href="/docs/en/next/reference-metrics">Pulsar Metrics</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-quickstart.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">How to connect Pulsar to database</h1></header><article><div><span><p>This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code.</p>
<p>It is helpful to review the <a href="/docs/en/next/io-overview">concepts</a> for Pulsar I/O with running the steps in this guide to gain a deeper understanding.</p>
<p>At the end of this tutorial, you are able to:</p>
<ul>
<li><p><a href="#Connect-Pulsar-to-Cassandra">Connect Pulsar to Cassandra</a></p></li>
<li><p><a href="#Connect-Pulsar-to-PostgreSQL">Connect Pulsar to PostgreSQL</a></p></li>
</ul>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip"></a><a href="#tip" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<ul>
<li><p>These instructions assume you are running Pulsar in <a href="/docs/en/next/getting-started-standalone">standalone mode</a>. However, all
the commands used in this tutorial can be used in a multi-nodes Pulsar cluster without any changes.</p></li>
<li><p>All the instructions are assumed to run at the root directory of a Pulsar binary distribution.</p></li>
</ul>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="install-pulsar-and-built-in-connector"></a><a href="#install-pulsar-and-built-in-connector" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Install Pulsar and built-in connector</h2>
<p>Before connecting Pulsar to a database, you need to install Pulsar and the desired built-in connector.</p>
<p>For more information about <strong>how to install a standalone Pulsar and built-in connectors</strong>, see <a href="/docs/en/next/getting-started-standalone/#installing-pulsar">here</a>.</p>
<h2><a class="anchor" aria-hidden="true" id="start-pulsar-standalone"></a><a href="#start-pulsar-standalone" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Start Pulsar standalone</h2>
<ol>
<li><p>Start Pulsar locally.</p>
<pre><code class="hljs css language-bash">bin/pulsar standalone
</code></pre>
<p>All the components of a Pulsar service are start in order.</p>
<p>You can curl those pulsar service endpoints to make sure Pulsar service is up running correctly.</p></li>
<li><p>Check Pulsar binary protocol port.</p>
<pre><code class="hljs css language-bash">telnet localhost 6650
</code></pre></li>
<li><p>Check Pulsar Function cluster.</p>
<pre><code class="hljs css language-bash">curl -s http://localhost:8080/admin/v2/worker/cluster
</code></pre>
<p><strong>Example output</strong></p>
<pre><code class="hljs css language-json">[{<span class="hljs-attr">"workerId"</span>:<span class="hljs-string">"c-standalone-fw-localhost-6750"</span>,<span class="hljs-attr">"workerHostname"</span>:<span class="hljs-string">"localhost"</span>,<span class="hljs-attr">"port"</span>:<span class="hljs-number">6750</span>}]
</code></pre></li>
<li><p>Make sure a public tenant and a default namespace exist.</p>
<pre><code class="hljs css language-bash">curl -s http://localhost:8080/admin/v2/namespaces/public
</code></pre>
<p><strong>Example output</strong></p>
<pre><code class="hljs css language-json">[<span class="hljs-string">"public/default"</span>,<span class="hljs-string">"public/functions"</span>]
</code></pre></li>
<li><p>All built-in connectors should be listed as available.</p>
<pre><code class="hljs css language-bash">curl -s http://localhost:8080/admin/v2/<span class="hljs-built_in">functions</span>/connectors
</code></pre>
<p><strong>Example output</strong></p>
<pre><code class="hljs css language-json">[{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"aerospike"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Aerospike database sink"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.aerospike.AerospikeStringSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"cassandra"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Writes data into Cassandra"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.cassandra.CassandraStringSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"kafka"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Kafka source and sink connector"</span>,<span class="hljs-attr">"sourceClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.kafka.KafkaStringSource"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.kafka.KafkaBytesSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"kinesis"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Kinesis sink connector"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.kinesis.KinesisSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"rabbitmq"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"RabbitMQ source connector"</span>,<span class="hljs-attr">"sourceClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.rabbitmq.RabbitMQSource"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"twitter"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Ingest data from Twitter firehose"</span>,<span class="hljs-attr">"sourceClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.twitter.TwitterFireHose"</span>}]
</code></pre>
<p>If an error occurs when starting Pulsar service, you may see an exception at the terminal running <code>pulsar/standalone</code>,
or you can navigate to the <code>logs</code> directory under the Pulsar directory to view the logs.</p></li>
</ol>
<h2><a class="anchor" aria-hidden="true" id="connect-pulsar-to-cassandra"></a><a href="#connect-pulsar-to-cassandra" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Connect Pulsar to Cassandra</h2>
<p>This section demonstrates how to connect Pulsar to Cassandra.</p>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-1"></a><a href="#tip-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<ul>
<li><p>Make sure you have Docker installed. If you do not have one, see <a href="https://docs.docker.com/docker-for-mac/install/">install Docker</a>.</p></li>
<li><p>The Cassandra sink connector reads messages from Pulsar topics and writes the messages into Cassandra tables. For more information, see <a href="/docs/en/next/io-cassandra-sink">Cassandra sink connector</a>.</p></li>
</ul>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="setup-a-cassandra-cluster"></a><a href="#setup-a-cassandra-cluster" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Setup a Cassandra cluster</h3>
<p>This example uses <code>cassandra</code> Docker image to start a single-node Cassandra cluster in Docker.</p>
<ol>
<li><p>Start a Cassandra cluster.</p>
<pre><code class="hljs css language-bash">docker run -d --rm --name=cassandra -p 9042:9042 cassandra
</code></pre>
<blockquote>
<p><strong>Note</strong></p>
<p>Before moving to the next steps, make sure the Cassandra cluster is running.</p>
</blockquote></li>
<li><p>Make sure the Docker process is running.</p>
<pre><code class="hljs css language-bash">docker ps
</code></pre></li>
<li><p>Check the Cassandra logs to make sure the Cassandra process is running as expected.</p>
<pre><code class="hljs css language-bash">docker logs cassandra
</code></pre></li>
<li><p>Check the status of the Cassandra cluster.</p>
<pre><code class="hljs css language-bash">docker <span class="hljs-built_in">exec</span> cassandra nodetool status
</code></pre>
<p><strong>Example output</strong></p>
<pre><code class="hljs">Datacenter: datacenter1
=======================
<span class="hljs-attribute">Status</span>=Up/Down
|/ <span class="hljs-attribute">State</span>=Normal/Leaving/Joining/Moving
-- <span class="hljs-built_in"> Address </span> Load Tokens Owns (effective) Host ID Rack
UN 172.17.0.2 103.67 KiB 256 100.0% af0e4b2f-84e0-4f0b-bb14-bd5f9070ff26 rack1
</code></pre></li>
<li><p>Use <code>cqlsh</code> to connect to the Cassandra cluster.</p>
<pre><code class="hljs css language-bash">$ docker <span class="hljs-built_in">exec</span> -ti cassandra cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Use HELP <span class="hljs-keyword">for</span> <span class="hljs-built_in">help</span>.
cqlsh&gt;
</code></pre></li>
<li><p>Create a keyspace <code>pulsar_test_keyspace</code>.</p>
<pre><code class="hljs css language-bash">cqlsh&gt; CREATE KEYSPACE pulsar_test_keyspace WITH replication = {<span class="hljs-string">'class'</span>:<span class="hljs-string">'SimpleStrategy'</span>, <span class="hljs-string">'replication_factor'</span>:1};
</code></pre></li>
<li><p>Create a table <code>pulsar_test_table</code>.</p>
<pre><code class="hljs css language-bash">cqlsh&gt; USE pulsar_test_keyspace;
cqlsh:pulsar_test_keyspace&gt; CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);
</code></pre></li>
</ol>
<h3><a class="anchor" aria-hidden="true" id="configure-a-cassandra-sink"></a><a href="#configure-a-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure a Cassandra sink</h3>
<p>Now that we have a Cassandra cluster running locally.</p>
<p>In this section, you need to configure a Cassandra sink connector.</p>
<p>To run a Cassandra sink connector, you need to prepare a configuration file including the information that Pulsar connector runtime needs to know.</p>
<p>For example, how Pulsar connector can find the Cassandra cluster, what is the keyspace and the table that Pulsar connector uses for writing Pulsar messages to, and so on.</p>
<p>You can create a configuration file through one of the following methods.</p>
<ul>
<li><p>JSON</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"roots"</span>: <span class="hljs-string">"localhost:9042"</span>,
<span class="hljs-attr">"keyspace"</span>: <span class="hljs-string">"pulsar_test_keyspace"</span>,
<span class="hljs-attr">"columnFamily"</span>: <span class="hljs-string">"pulsar_test_table"</span>,
<span class="hljs-attr">"keyname"</span>: <span class="hljs-string">"key"</span>,
<span class="hljs-attr">"columnName"</span>: <span class="hljs-string">"col"</span>
}
</code></pre></li>
<li><p>YAML</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">configs:</span>
<span class="hljs-attr">roots:</span> <span class="hljs-string">"localhost:9042"</span>
<span class="hljs-attr">keyspace:</span> <span class="hljs-string">"pulsar_test_keyspace"</span>
<span class="hljs-attr">columnFamily:</span> <span class="hljs-string">"pulsar_test_table"</span>
<span class="hljs-attr">keyname:</span> <span class="hljs-string">"key"</span>
<span class="hljs-attr">columnName:</span> <span class="hljs-string">"col"</span>
</code></pre></li>
</ul>
<p>For more information, see <a href="/docs/en/next/io-cassandra-sink">Cassandra sink connector</a>.</p>
<h3><a class="anchor" aria-hidden="true" id="create-a-cassandra-sink"></a><a href="#create-a-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create a Cassandra sink</h3>
<p>You can use the <a href="/docs/en/next/io-cli">Connector Admin CLI</a>
to create a sink connector and perform other operations on them.</p>
<p>Run the following command to create a Cassandra sink connector with sink type <em>cassandra</em> and the config file <em>examples/cassandra-sink.yml</em> created previously.</p>
<h4><a class="anchor" aria-hidden="true" id="note"></a><a href="#note" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Note</h4>
<blockquote>
<p>The <code>sink-type</code> parameter of the currently built-in connectors is determined by the setting of the <code>name</code> parameter specified in the pulsar-io.yaml file.</p>
</blockquote>
<pre><code class="hljs css language-bash">bin/pulsar-admin sinks create \
--tenant public \
--namespace default \
--name cassandra-test-sink \
--sink-type cassandra \
--sink-config-file examples/cassandra-sink.yml \
--inputs test_cassandra
</code></pre>
<p>Once the command is executed, Pulsar creates the sink connector <em>cassandra-test-sink</em>.</p>
<p>This sink connector runs
as a Pulsar Function and writes the messages produced in the topic <em>test_cassandra</em> to the Cassandra table <em>pulsar_test_table</em>.</p>
<h3><a class="anchor" aria-hidden="true" id="inspect-a-cassandra-sink"></a><a href="#inspect-a-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Inspect a Cassandra sink</h3>
<p>You can use the <a href="/docs/en/next/io-cli">Connector Admin CLI</a>
to monitor a connector and perform other operations on it.</p>
<ul>
<li><p>Get the information of a Cassandra sink.</p>
<pre><code class="hljs css language-bash">bin/pulsar-admin sinks get \
--tenant public \
--namespace default \
--name cassandra-test-sink
</code></pre>
<p><strong>Example output</strong></p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"tenant"</span>: <span class="hljs-string">"public"</span>,
<span class="hljs-attr">"namespace"</span>: <span class="hljs-string">"default"</span>,
<span class="hljs-attr">"name"</span>: <span class="hljs-string">"cassandra-test-sink"</span>,
<span class="hljs-attr">"className"</span>: <span class="hljs-string">"org.apache.pulsar.io.cassandra.CassandraStringSink"</span>,
<span class="hljs-attr">"inputSpecs"</span>: {
<span class="hljs-attr">"test_cassandra"</span>: {
<span class="hljs-attr">"isRegexPattern"</span>: <span class="hljs-literal">false</span>
}
},
<span class="hljs-attr">"configs"</span>: {
<span class="hljs-attr">"roots"</span>: <span class="hljs-string">"localhost:9042"</span>,
<span class="hljs-attr">"keyspace"</span>: <span class="hljs-string">"pulsar_test_keyspace"</span>,
<span class="hljs-attr">"columnFamily"</span>: <span class="hljs-string">"pulsar_test_table"</span>,
<span class="hljs-attr">"keyname"</span>: <span class="hljs-string">"key"</span>,
<span class="hljs-attr">"columnName"</span>: <span class="hljs-string">"col"</span>
},
<span class="hljs-attr">"parallelism"</span>: <span class="hljs-number">1</span>,
<span class="hljs-attr">"processingGuarantees"</span>: <span class="hljs-string">"ATLEAST_ONCE"</span>,
<span class="hljs-attr">"retainOrdering"</span>: <span class="hljs-literal">false</span>,
<span class="hljs-attr">"autoAck"</span>: <span class="hljs-literal">true</span>,
<span class="hljs-attr">"archive"</span>: <span class="hljs-string">"builtin://cassandra"</span>
}
</code></pre></li>
<li><p>Check the status of a Cassandra sink.</p>
<pre><code class="hljs css language-bash">bin/pulsar-admin sinks status \
--tenant public \
--namespace default \
--name cassandra-test-sink
</code></pre>
<p><strong>Example output</strong></p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"numInstances"</span> : <span class="hljs-number">1</span>,
<span class="hljs-attr">"numRunning"</span> : <span class="hljs-number">1</span>,
<span class="hljs-attr">"instances"</span> : [ {
<span class="hljs-attr">"instanceId"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"status"</span> : {
<span class="hljs-attr">"running"</span> : <span class="hljs-literal">true</span>,
<span class="hljs-attr">"error"</span> : <span class="hljs-string">""</span>,
<span class="hljs-attr">"numRestarts"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"numReadFromPulsar"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"numSystemExceptions"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"latestSystemExceptions"</span> : [ ],
<span class="hljs-attr">"numSinkExceptions"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"latestSinkExceptions"</span> : [ ],
<span class="hljs-attr">"numWrittenToSink"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"lastReceivedTime"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"workerId"</span> : <span class="hljs-string">"c-standalone-fw-localhost-8080"</span>
}
} ]
}
</code></pre></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="verify-a-cassandra-sink"></a><a href="#verify-a-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Verify a Cassandra sink</h3>
<ol>
<li><p>Produce some messages to the input topic of the Cassandra sink <em>test_cassandra</em>.</p>
<pre><code class="hljs css language-bash"><span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> {0..9}; <span class="hljs-keyword">do</span> bin/pulsar-client produce -m <span class="hljs-string">"key-<span class="hljs-variable">$i</span>"</span> -n 1 test_cassandra; <span class="hljs-keyword">done</span>
</code></pre></li>
<li><p>Inspect the status of the Cassandra sink <em>test_cassandra</em>.</p>
<pre><code class="hljs css language-bash">bin/pulsar-admin sinks status \
--tenant public \
--namespace default \
--name cassandra-test-sink
</code></pre>
<p>You can see 10 messages are processed by the Cassandra sink <em>test_cassandra</em>.</p>
<p><strong>Example output</strong></p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"numInstances"</span> : <span class="hljs-number">1</span>,
<span class="hljs-attr">"numRunning"</span> : <span class="hljs-number">1</span>,
<span class="hljs-attr">"instances"</span> : [ {
<span class="hljs-attr">"instanceId"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"status"</span> : {
<span class="hljs-attr">"running"</span> : <span class="hljs-literal">true</span>,
<span class="hljs-attr">"error"</span> : <span class="hljs-string">""</span>,
<span class="hljs-attr">"numRestarts"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"numReadFromPulsar"</span> : <span class="hljs-number">10</span>,
<span class="hljs-attr">"numSystemExceptions"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"latestSystemExceptions"</span> : [ ],
<span class="hljs-attr">"numSinkExceptions"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"latestSinkExceptions"</span> : [ ],
<span class="hljs-attr">"numWrittenToSink"</span> : <span class="hljs-number">10</span>,
<span class="hljs-attr">"lastReceivedTime"</span> : <span class="hljs-number">1551685489136</span>,
<span class="hljs-attr">"workerId"</span> : <span class="hljs-string">"c-standalone-fw-localhost-8080"</span>
}
} ]
}
</code></pre></li>
<li><p>Use <code>cqlsh</code> to connect to the Cassandra cluster.</p>
<pre><code class="hljs css language-bash">docker <span class="hljs-built_in">exec</span> -ti cassandra cqlsh localhost
</code></pre></li>
<li><p>Check the data of the Cassandra table <em>pulsar_test_table</em>.</p>
<pre><code class="hljs css language-bash">cqlsh&gt; use pulsar_test_keyspace;
cqlsh:pulsar_test_keyspace&gt; select * from pulsar_test_table;
key | col
--------+--------
key-5 | key-5
key-0 | key-0
key-9 | key-9
key-2 | key-2
key-1 | key-1
key-3 | key-3
key-6 | key-6
key-7 | key-7
key-4 | key-4
key-8 | key-8
</code></pre></li>
</ol>
<h3><a class="anchor" aria-hidden="true" id="delete-a-cassandra-sink"></a><a href="#delete-a-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Delete a Cassandra Sink</h3>
<p>You can use the <a href="/docs/en/next/io-cli">Connector Admin CLI</a>
to delete a connector and perform other operations on it.</p>
<pre><code class="hljs css language-bash">bin/pulsar-admin sinks delete \
--tenant public \
--namespace default \
--name cassandra-test-sink
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="connect-pulsar-to-postgresql"></a><a href="#connect-pulsar-to-postgresql" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Connect Pulsar to PostgreSQL</h2>
<p>This section demonstrates how to connect Pulsar to PostgreSQL.</p>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-2"></a><a href="#tip-2" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<ul>
<li><p>Make sure you have Docker installed. If you do not have one, see <a href="https://docs.docker.com/docker-for-mac/install/">install Docker</a>.</p></li>
<li><p>The JDBC sink connector pulls messages from Pulsar topics
and persists the messages to ClickHouse, MariaDB, PostgreSQL, or SQlite.
For more information, see <a href="/docs/en/next/io-jdbc-sink">JDBC sink connector</a>.</p></li>
</ul>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="setup-a-postgresql-cluster"></a><a href="#setup-a-postgresql-cluster" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Setup a PostgreSQL cluster</h3>
<p>This example uses the PostgreSQL 12 docker image to start a single-node PostgreSQL cluster in Docker.</p>
<ol>
<li><p>Pull the PostgreSQL 12 image from Docker.</p>
<pre><code class="hljs css language-bash">$ docker pull postgres:12
</code></pre></li>
<li><p>Start PostgreSQL.</p>
<pre><code class="hljs css language-bash">$ docker run -d -it --rm \
--name pulsar-postgres \
-p 5432:5432 \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_USER=postgres \
postgres:12
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="tip-3"></a><a href="#tip-3" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<table>
<thead>
<tr><th>Flag</th><th>Description</th><th>This example</th></tr>
</thead>
<tbody>
<tr><td><code>-d</code></td><td>To start a container in detached mode.</td><td>/</td></tr>
<tr><td><code>-it</code></td><td>Keep STDIN open even if not attached and allocate a terminal.</td><td>/</td></tr>
<tr><td><code>--rm</code></td><td>Remove the container automatically when it exits.</td><td>/</td></tr>
<tr><td><code>-name</code></td><td>Assign a name to the container.</td><td>This example specifies <em>pulsar-postgres</em> for the container.</td></tr>
<tr><td><code>-p</code></td><td>Publish the port of the container to the host.</td><td>This example publishes the port <em>5432</em> of the container to the host.</td></tr>
<tr><td><code>-e</code></td><td>Set environment variables.</td><td>This example sets the following variables:<br>- The password for the user is <em>password</em>.<br>- The name for the user is <em>postgres</em>.</td></tr>
</tbody>
</table>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-4"></a><a href="#tip-4" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about Docker commands, see <a href="https://docs.docker.com/engine/reference/commandline/run/">Docker CLI</a>.</p>
</blockquote></li>
<li><p>Check if PostgreSQL has been started successfully.</p>
<pre><code class="hljs css language-bash">$ docker logs -f pulsar-postgres
</code></pre>
<p>PostgreSQL has been started successfully if the following message appears.</p>
<pre><code class="hljs css language-text">2020-05-11 20:09:24.492 UTC [1] LOG: starting PostgreSQL 12.2 (Debian 12.2-2.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
2020-05-11 20:09:24.492 UTC [1] LOG: listening on IPv4 address "0.0.0.0", port 5432
2020-05-11 20:09:24.492 UTC [1] LOG: listening on IPv6 address "::", port 5432
2020-05-11 20:09:24.499 UTC [1] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
2020-05-11 20:09:24.523 UTC [55] LOG: database system was shut down at 2020-05-11 20:09:24 UTC
2020-05-11 20:09:24.533 UTC [1] LOG: database system is ready to accept connections
</code></pre></li>
<li><p>Access to PostgreSQL.</p>
<pre><code class="hljs css language-bash">$ docker <span class="hljs-built_in">exec</span> -it pulsar-postgres /bin/bash
</code></pre></li>
<li><p>Create a PostgreSQL table <em>pulsar_postgres_jdbc_sink</em>.</p>
<pre><code class="hljs css language-bash">$ psql -U postgres postgres
postgres=<span class="hljs-comment"># create table if not exists pulsar_postgres_jdbc_sink</span>
(
id serial PRIMARY KEY,
name VARCHAR(255) NOT NULL
);
</code></pre></li>
</ol>
<h3><a class="anchor" aria-hidden="true" id="configure-a-jdbc-sink"></a><a href="#configure-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure a JDBC sink</h3>
<p>Now we have a PostgreSQL running locally.</p>
<p>In this section, you need to configure a JDBC sink connector.</p>
<ol>
<li><p>Add a configuration file.</p>
<p>To run a JDBC sink connector, you need to prepare a YAML configuration file including the information that Pulsar connector runtime needs to know.</p>
<p>For example, how Pulsar connector can find the PostgreSQL cluster, what is the JDBC URL and the table that Pulsar connector uses for writing messages to.</p>
<p>Create a <em>pulsar-postgres-jdbc-sink.yaml</em> file, copy the following contents to this file, and place the file in the <code>pulsar/connectors</code> folder.</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">configs:</span>
<span class="hljs-attr">userName:</span> <span class="hljs-string">"postgres"</span>
<span class="hljs-attr">password:</span> <span class="hljs-string">"password"</span>
<span class="hljs-attr">jdbcUrl:</span> <span class="hljs-string">"jdbc:postgresql://localhost:5432/postgres"</span>
<span class="hljs-attr">tableName:</span> <span class="hljs-string">"pulsar_postgres_jdbc_sink"</span>
</code></pre></li>
<li><p>Create a schema.</p>
<p>Create a <em>avro-schema</em> file, copy the following contents to this file, and place the file in the <code>pulsar/connectors</code> folder.</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"type"</span>: <span class="hljs-string">"AVRO"</span>,
<span class="hljs-attr">"schema"</span>: <span class="hljs-string">"{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}"</span>,
<span class="hljs-attr">"properties"</span>: {}
}
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-5"></a><a href="#tip-5" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about AVRO, see <a href="https://avro.apache.org/docs/1.9.1/">Apache Avro</a>.</p>
</blockquote></li>
</ol>
<ol start="3">
<li><p>Upload a schema to a topic.</p>
<p>This example uploads the <em>avro-schema</em> schema to the <em>pulsar-postgres-jdbc-sink-topic</em> topic.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin schemas upload pulsar-postgres-jdbc-sink-topic -f ./connectors/avro-schema
</code></pre></li>
<li><p>Check if the schema has been uploaded successfully.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin schemas get pulsar-postgres-jdbc-sink-topic
</code></pre>
<p>The schema has been uploaded successfully if the following message appears.</p>
<pre><code class="hljs css language-json">{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"pulsar-postgres-jdbc-sink-topic"</span>,<span class="hljs-attr">"schema"</span>:<span class="hljs-string">"{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}"</span>,<span class="hljs-attr">"type"</span>:<span class="hljs-string">"AVRO"</span>,<span class="hljs-attr">"properties"</span>:{}}
</code></pre></li>
</ol>
<h3><a class="anchor" aria-hidden="true" id="create-a-jdbc-sink"></a><a href="#create-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create a JDBC sink</h3>
<p>You can use the <a href="/docs/en/next/io-cli">Connector Admin CLI</a>
to create a sink connector and perform other operations on it.</p>
<p>This example creates a sink connector and specifies the desired information.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks create \
--archive ./connectors/pulsar-io-jdbc-postgres-2.10.0.nar \
--inputs pulsar-postgres-jdbc-sink-topic \
--name pulsar-postgres-jdbc-sink \
--sink-config-file ./connectors/pulsar-postgres-jdbc-sink.yaml \
--parallelism 1
</code></pre>
<p>Once the command is executed, Pulsar creates a sink connector <em>pulsar-postgres-jdbc-sink</em>.</p>
<p>This sink connector runs as a Pulsar Function and writes the messages produced in the topic <em>pulsar-postgres-jdbc-sink-topic</em> to the PostgreSQL table <em>pulsar_postgres_jdbc_sink</em>.</p>
<h4><a class="anchor" aria-hidden="true" id="tip-6"></a><a href="#tip-6" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<table>
<thead>
<tr><th>Flag</th><th>Description</th><th>This example</th></tr>
</thead>
<tbody>
<tr><td><code>--archive</code></td><td>The path to the archive file for the sink.</td><td><em>pulsar-io-jdbc-postgres-2.10.0.nar</em></td></tr>
<tr><td><code>--inputs</code></td><td>The input topic(s) of the sink. <br><br> Multiple topics can be specified as a comma-separated list.</td><td></td></tr>
<tr><td><code>--name</code></td><td>The name of the sink.</td><td><em>pulsar-postgres-jdbc-sink</em></td></tr>
<tr><td><code>--sink-config-file</code></td><td>The path to a YAML config file specifying the configuration of the sink.</td><td><em>pulsar-postgres-jdbc-sink.yaml</em></td></tr>
<tr><td><code>--parallelism</code></td><td>The parallelism factor of the sink. <br><br> For example, the number of sink instances to run.</td><td><em>1</em></td></tr>
</tbody>
</table>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-7"></a><a href="#tip-7" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <code>pulsar-admin sinks create options</code>, see <a href="/docs/en/next/io-cli#sinks">here</a>.</p>
</blockquote>
<p>The sink has been created successfully if the following message appears.</p>
<pre><code class="hljs css language-bash">Created successfully
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="inspect-a-jdbc-sink"></a><a href="#inspect-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Inspect a JDBC sink</h3>
<p>You can use the <a href="/docs/en/next/io-cli">Connector Admin CLI</a>
to monitor a connector and perform other operations on it.</p>
<ul>
<li><p>List all running JDBC sink(s).</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks list \
--tenant public \
--namespace default
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-8"></a><a href="#tip-8" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <code>pulsar-admin sinks list options</code>, see <a href="/docs/en/next/io-cli/#list-1">here</a>.</p>
</blockquote>
<p>The result shows that only the <em>postgres-jdbc-sink</em> sink is running.</p>
<pre><code class="hljs css language-json">[
<span class="hljs-string">"pulsar-postgres-jdbc-sink"</span>
]
</code></pre></li>
<li><p>Get the information of a JDBC sink.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks get \
--tenant public \
--namespace default \
--name pulsar-postgres-jdbc-sink
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-9"></a><a href="#tip-9" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <code>pulsar-admin sinks get options</code>, see <a href="/docs/en/next/io-cli/#get-1">here</a>.</p>
</blockquote>
<p>The result shows the information of the sink connector, including tenant, namespace, topic and so on.</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"tenant"</span>: <span class="hljs-string">"public"</span>,
<span class="hljs-attr">"namespace"</span>: <span class="hljs-string">"default"</span>,
<span class="hljs-attr">"name"</span>: <span class="hljs-string">"pulsar-postgres-jdbc-sink"</span>,
<span class="hljs-attr">"className"</span>: <span class="hljs-string">"org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink"</span>,
<span class="hljs-attr">"inputSpecs"</span>: {
<span class="hljs-attr">"pulsar-postgres-jdbc-sink-topic"</span>: {
<span class="hljs-attr">"isRegexPattern"</span>: <span class="hljs-literal">false</span>
}
},
<span class="hljs-attr">"configs"</span>: {
<span class="hljs-attr">"password"</span>: <span class="hljs-string">"password"</span>,
<span class="hljs-attr">"jdbcUrl"</span>: <span class="hljs-string">"jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink"</span>,
<span class="hljs-attr">"userName"</span>: <span class="hljs-string">"postgres"</span>,
<span class="hljs-attr">"tableName"</span>: <span class="hljs-string">"pulsar_postgres_jdbc_sink"</span>
},
<span class="hljs-attr">"parallelism"</span>: <span class="hljs-number">1</span>,
<span class="hljs-attr">"processingGuarantees"</span>: <span class="hljs-string">"ATLEAST_ONCE"</span>,
<span class="hljs-attr">"retainOrdering"</span>: <span class="hljs-literal">false</span>,
<span class="hljs-attr">"autoAck"</span>: <span class="hljs-literal">true</span>
}
</code></pre></li>
<li><p>Get the status of a JDBC sink</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks status \
--tenant public \
--namespace default \
--name pulsar-postgres-jdbc-sink
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-10"></a><a href="#tip-10" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <code>pulsar-admin sinks status options</code>, see <a href="/docs/en/next/io-cli/#status-1">here</a>.</p>
</blockquote>
<p>The result shows the current status of sink connector, including the number of instance, running status, worker ID and so on.</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"numInstances"</span> : <span class="hljs-number">1</span>,
<span class="hljs-attr">"numRunning"</span> : <span class="hljs-number">1</span>,
<span class="hljs-attr">"instances"</span> : [ {
<span class="hljs-attr">"instanceId"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"status"</span> : {
<span class="hljs-attr">"running"</span> : <span class="hljs-literal">true</span>,
<span class="hljs-attr">"error"</span> : <span class="hljs-string">""</span>,
<span class="hljs-attr">"numRestarts"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"numReadFromPulsar"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"numSystemExceptions"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"latestSystemExceptions"</span> : [ ],
<span class="hljs-attr">"numSinkExceptions"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"latestSinkExceptions"</span> : [ ],
<span class="hljs-attr">"numWrittenToSink"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"lastReceivedTime"</span> : <span class="hljs-number">0</span>,
<span class="hljs-attr">"workerId"</span> : <span class="hljs-string">"c-standalone-fw-192.168.2.52-8080"</span>
}
} ]
}
</code></pre></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="stop-a-jdbc-sink"></a><a href="#stop-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Stop a JDBC sink</h3>
<p>You can use the <a href="/docs/en/next/io-cli">Connector Admin CLI</a>
to stop a connector and perform other operations on it.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks stop \
--tenant public \
--namespace default \
--name pulsar-postgres-jdbc-sink
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-11"></a><a href="#tip-11" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <code>pulsar-admin sinks stop options</code>, see <a href="/docs/en/next/io-cli/#stop-1">here</a>.</p>
</blockquote>
<p>The sink instance has been stopped successfully if the following message disappears.</p>
<pre><code class="hljs css language-bash">Stopped successfully
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="restart-a-jdbc-sink"></a><a href="#restart-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Restart a JDBC sink</h3>
<p>You can use the <a href="/docs/en/next/io-cli">Connector Admin CLI</a>
to restart a connector and perform other operations on it.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks restart \
--tenant public \
--namespace default \
--name pulsar-postgres-jdbc-sink
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-12"></a><a href="#tip-12" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <code>pulsar-admin sinks restart options</code>, see <a href="/docs/en/next/io-cli/#restart-1">here</a>.</p>
</blockquote>
<p>The sink instance has been started successfully if the following message disappears.</p>
<pre><code class="hljs css language-bash">Started successfully
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-13"></a><a href="#tip-13" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<ul>
<li><p>Optionally, you can run a standalone sink connector using <code>pulsar-admin sinks localrun options</code>.</p>
<p>Note that <code>pulsar-admin sinks localrun options</code> <strong>runs a sink connector locally</strong>, while <code>pulsar-admin sinks start options</code> <strong>starts a sink connector in a cluster</strong>.</p></li>
<li><p>For more information about <code>pulsar-admin sinks localrun options</code>, see <a href="/docs/en/next/io-cli#localrun-1">here</a>.</p></li>
</ul>
</blockquote>
<h3><a class="anchor" aria-hidden="true" id="update-a-jdbc-sink"></a><a href="#update-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Update a JDBC sink</h3>
<p>You can use the <a href="/docs/en/next/io-cli">Connector Admin CLI</a>
to update a connector and perform other operations on it.</p>
<p>This example updates the parallelism of the <em>pulsar-postgres-jdbc-sink</em> sink connector to 2.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks update \
--name pulsar-postgres-jdbc-sink \
--parallelism 2
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-14"></a><a href="#tip-14" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <code>pulsar-admin sinks update options</code>, see <a href="/docs/en/next/io-cli/#update-1">here</a>.</p>
</blockquote>
<p>The sink connector has been updated successfully if the following message disappears.</p>
<pre><code class="hljs css language-bash">Updated successfully
</code></pre>
<p>This example double-checks the information.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks get \
--tenant public \
--namespace default \
--name pulsar-postgres-jdbc-sink
</code></pre>
<p>The result shows that the parallelism is 2.</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"tenant"</span>: <span class="hljs-string">"public"</span>,
<span class="hljs-attr">"namespace"</span>: <span class="hljs-string">"default"</span>,
<span class="hljs-attr">"name"</span>: <span class="hljs-string">"pulsar-postgres-jdbc-sink"</span>,
<span class="hljs-attr">"className"</span>: <span class="hljs-string">"org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink"</span>,
<span class="hljs-attr">"inputSpecs"</span>: {
<span class="hljs-attr">"pulsar-postgres-jdbc-sink-topic"</span>: {
<span class="hljs-attr">"isRegexPattern"</span>: <span class="hljs-literal">false</span>
}
},
<span class="hljs-attr">"configs"</span>: {
<span class="hljs-attr">"password"</span>: <span class="hljs-string">"password"</span>,
<span class="hljs-attr">"jdbcUrl"</span>: <span class="hljs-string">"jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink"</span>,
<span class="hljs-attr">"userName"</span>: <span class="hljs-string">"postgres"</span>,
<span class="hljs-attr">"tableName"</span>: <span class="hljs-string">"pulsar_postgres_jdbc_sink"</span>
},
<span class="hljs-attr">"parallelism"</span>: <span class="hljs-number">2</span>,
<span class="hljs-attr">"processingGuarantees"</span>: <span class="hljs-string">"ATLEAST_ONCE"</span>,
<span class="hljs-attr">"retainOrdering"</span>: <span class="hljs-literal">false</span>,
<span class="hljs-attr">"autoAck"</span>: <span class="hljs-literal">true</span>
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="delete-a-jdbc-sink"></a><a href="#delete-a-jdbc-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Delete a JDBC sink</h3>
<p>You can use the <a href="/docs/en/next/io-cli">Connector Admin CLI</a>
to delete a connector and perform other operations on it.</p>
<p>This example deletes the <em>pulsar-postgres-jdbc-sink</em> sink connector.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks delete \
--tenant public \
--namespace default \
--name pulsar-postgres-jdbc-sink
</code></pre>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip-15"></a><a href="#tip-15" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<p>For more information about <code>pulsar-admin sinks delete options</code>, see <a href="/docs/en/next/io-cli/#delete-1">here</a>.</p>
</blockquote>
<p>The sink connector has been deleted successfully if the following message appears.</p>
<pre><code class="hljs css language-text">Deleted successfully
</code></pre>
<p>This example double-checks the status of the sink connector.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin sinks get \
--tenant public \
--namespace default \
--name pulsar-postgres-jdbc-sink
</code></pre>
<p>The result shows that the sink connector does not exist.</p>
<pre><code class="hljs css language-text">HTTP 404 Not Found
Reason: Sink pulsar-postgres-jdbc-sink doesn't exist
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/next/io-overview"><span class="arrow-prev"></span><span>Overview</span></a><a class="docs-next button" href="/docs/en/next/io-use"><span>Use</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#install-pulsar-and-built-in-connector">Install Pulsar and built-in connector</a></li><li><a href="#start-pulsar-standalone">Start Pulsar standalone</a></li><li><a href="#connect-pulsar-to-cassandra">Connect Pulsar to Cassandra</a><ul class="toc-headings"><li><a href="#setup-a-cassandra-cluster">Setup a Cassandra cluster</a></li><li><a href="#configure-a-cassandra-sink">Configure a Cassandra sink</a></li><li><a href="#create-a-cassandra-sink">Create a Cassandra sink</a></li><li><a href="#inspect-a-cassandra-sink">Inspect a Cassandra sink</a></li><li><a href="#verify-a-cassandra-sink">Verify a Cassandra sink</a></li><li><a href="#delete-a-cassandra-sink">Delete a Cassandra Sink</a></li></ul></li><li><a href="#connect-pulsar-to-postgresql">Connect Pulsar to PostgreSQL</a><ul class="toc-headings"><li><a href="#setup-a-postgresql-cluster">Setup a PostgreSQL cluster</a></li><li><a href="#configure-a-jdbc-sink">Configure a JDBC sink</a></li><li><a href="#create-a-jdbc-sink">Create a JDBC sink</a></li><li><a href="#inspect-a-jdbc-sink">Inspect a JDBC sink</a></li><li><a href="#stop-a-jdbc-sink">Stop a JDBC sink</a></li><li><a href="#restart-a-jdbc-sink">Restart a JDBC sink</a></li><li><a href="#update-a-jdbc-sink">Update a JDBC sink</a></li><li><a href="#delete-a-jdbc-sink">Delete a JDBC sink</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>