blob: ae49bbae23b4fe9f8778077872a8efa312c12467 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Tutorial: Connecting Pulsar with Apache Cassandra · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code."/><meta name="docsearch:version" content="2.2.1"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Tutorial: Connecting Pulsar with Apache Cassandra · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.2.1</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.2.1/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.2.1/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.2.1/io-quickstart">日本語</a></li><li><a href="/docs/fr/2.2.1/io-quickstart">Français</a></li><li><a href="/docs/ko/2.2.1/io-quickstart">한국어</a></li><li><a href="/docs/zh-CN/2.2.1/io-quickstart">中文</a></li><li><a href="/docs/zh-TW/2.2.1/io-quickstart">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i></i><span>Pulsar IO</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Getting Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/getting-started-docker">Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/client-libraries">Client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/concepts-schema-registry">Schema Registry</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/functions-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/functions-api">API</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/functions-deploying">Deploying functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/functions-guarantees">Processing guarantees</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/functions-state">State Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/functions-metrics">Metrics</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/io-overview">Overview</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.2.1/io-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/io-managing">Managing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/io-connectors">Builtin Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/io-develop">Developing Connectors</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/sql-getting-started">Getting Started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/sql-deployment-configurations">Deployment and Configuration</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/administration-dashboard">Dashboard</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/administration-load-distribution">Load distribution</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/administration-proxy">Pulsar proxy</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/security-extending">Extending</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/client-libraries-websocket">WebSocket</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/admin-api-schemas">Schemas</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/cookbooks-message-queue">Message queue</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.1/reference-configuration">Pulsar configuration</a></li></ul></div></div></section></div><script>
var coll = document.getElementsByClassName('collapsible');
var checkActiveCategory = true;
for (var i = 0; i < coll.length; i++) {
var links = coll[i].nextElementSibling.getElementsByTagName('*');
if (checkActiveCategory){
for (var j = 0; j < links.length; j++) {
if (links[j].classList.contains('navListItemActive')){
coll[i].nextElementSibling.classList.toggle('hide');
coll[i].childNodes[1].classList.toggle('rotate');
checkActiveCategory = false;
break;
}
}
}
coll[i].addEventListener('click', function() {
var arrow = this.childNodes[1];
arrow.classList.toggle('rotate');
var content = this.nextElementSibling;
content.classList.toggle('hide');
});
}
document.addEventListener('DOMContentLoaded', function() {
createToggler('#navToggler', '#docsNav', 'docsSliderActive');
createToggler('#tocToggler', 'body', 'tocActive');
var headings = document.querySelector('.toc-headings');
headings && headings.addEventListener('click', function(event) {
var el = event.target;
while(el !== headings){
if (el.tagName === 'A') {
document.body.classList.remove('tocActive');
break;
} else{
el = el.parentNode;
}
}
}, false);
function createToggler(togglerSelector, targetSelector, className) {
var toggler = document.querySelector(togglerSelector);
var target = document.querySelector(targetSelector);
if (!toggler) {
return;
}
toggler.onclick = function(event) {
event.preventDefault();
target.classList.toggle(className);
};
}
});
</script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-quickstart.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Tutorial: Connecting Pulsar with Apache Cassandra</h1></header><article><div><span><p>This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code.
It is helpful to review the <a href="/docs/en/2.2.1/io-overview">concepts</a> for Pulsar I/O in tandem with running the steps in this guide
to gain a deeper understanding. At the end of this tutorial, you will be able to:</p>
<ul>
<li>Connect your Pulsar cluster with your Cassandra cluster</li>
</ul>
<blockquote>
<h4><a class="anchor" aria-hidden="true" id="tip"></a><a href="#tip" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4>
<ol>
<li><p>These instructions assume you are running Pulsar in <a href="/docs/en/2.2.1/getting-started-standalone">standalone mode</a>. However all
the commands used in this tutorial should be able to be used in a multi-nodes Pulsar cluster without any changes.</p></li>
<li><p>All the instructions are assumed to run at the root directory of a Pulsar binary distribution.</p></li>
</ol>
</blockquote>
<h2><a class="anchor" aria-hidden="true" id="installing-pulsar"></a><a href="#installing-pulsar" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Installing Pulsar</h2>
<p>To get started running Pulsar, download a binary tarball release in one of the following ways:</p>
<ul>
<li><p>by clicking the link below and downloading the release from an Apache mirror:</p>
<ul>
<li><a href="https://archive.apache.org/dist/pulsar/pulsar-2.2.1/apache-pulsar-2.2.1-bin.tar.gz" download>Pulsar 2.2.1 binary release</a></li>
</ul></li>
<li><p>from the Pulsar <a href="/download">downloads page</a></p></li>
<li><p>from the Pulsar <a href="https://github.com/apache/pulsar/releases/latest">releases page</a></p></li>
<li><p>using <a href="https://www.gnu.org/software/wget">wget</a>:</p>
<pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> wget https://archive.apache.org/dist/pulsar/pulsar-2.2.1/apache-pulsar-2.2.1-bin.tar.gz</span>
</code></pre></li>
</ul>
<p>Once the tarball is downloaded, untar it and <code>cd</code> into the resulting directory:</p>
<pre><code class="hljs css language-bash">$ tar xvfz apache-pulsar-2.2.1-bin.tar.gz
$ <span class="hljs-built_in">cd</span> apache-pulsar-2.2.1
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="installing-builtin-connectors"></a><a href="#installing-builtin-connectors" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Installing Builtin Connectors</h2>
<p>Since release <code>2.1.0-incubating</code>, Pulsar releases a separate binary distribution, containing all the <code>builtin</code> connectors.
If you would like to enable those <code>builtin</code> connectors, you can download the connectors tarball release in one of the following ways:</p>
<ul>
<li><p>by clicking the link below and downloading the release from an Apache mirror:</p>
<ul>
<li><a href="https://archive.apache.org/dist/pulsar/pulsar-2.2.1/apache-pulsar-io-connectors-2.2.1-bin.tar.gz" download>Pulsar IO Connectors 2.2.1 release</a></li>
</ul></li>
<li><p>from the Pulsar <a href="/download">downloads page</a></p></li>
<li><p>from the Pulsar <a href="https://github.com/apache/pulsar/releases/latest">releases page</a></p></li>
<li><p>using <a href="https://www.gnu.org/software/wget">wget</a>:</p>
<pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> wget https://archive.apache.org/dist/pulsar/pulsar-2.2.1/apache-pulsar-io-connectors-2.2.1-bin.tar.gz</span>
</code></pre></li>
</ul>
<p>Once the tarball is downloaded, in the pulsar directory, untar the io-connectors package and copy the connectors as <code>connectors</code>
in the pulsar directory:</p>
<pre><code class="hljs css language-bash">$ tar xvfz /path/to/apache-pulsar-io-connectors-2.2.1-bin.tar.gz
// you will find a directory named `apache-pulsar-io-connectors-2.2.1` <span class="hljs-keyword">in</span> the pulsar directory
// <span class="hljs-keyword">then</span> copy the connectors
$ cp -r apache-pulsar-io-connectors-2.2.1/connectors connectors
$ ls connectors
pulsar-io-aerospike-2.2.1.nar
pulsar-io-cassandra-2.2.1.nar
pulsar-io-kafka-2.2.1.nar
pulsar-io-kinesis-2.2.1.nar
pulsar-io-rabbitmq-2.2.1.nar
pulsar-io-twitter-2.2.1.nar
...
</code></pre>
<h2><a class="anchor" aria-hidden="true" id="start-pulsar-service"></a><a href="#start-pulsar-service" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Start Pulsar Service</h2>
<pre><code class="hljs css language-bash">bin/pulsar standalone
</code></pre>
<p>All the components of a Pulsar service will start in order. You can curl those pulsar service endpoints to make sure Pulsar service is up running correctly.</p>
<ol>
<li>Check pulsar binary protocol port.</li>
</ol>
<pre><code class="hljs css language-bash">telnet localhost 6650
</code></pre>
<ol start="2">
<li>Check pulsar function cluster</li>
</ol>
<pre><code class="hljs css language-bash">curl -s http://localhost:8080/admin/v2/worker/cluster
</code></pre>
<p>Example output:</p>
<pre><code class="hljs css language-shell">[{"workerId":"c-standalone-fw-localhost-6750","workerHostname":"localhost","port":6750}]
</code></pre>
<ol start="3">
<li>Make sure public tenant and default namespace exist</li>
</ol>
<pre><code class="hljs css language-bash">curl -s http://localhost:8080/admin/v2/namespaces/public
</code></pre>
<p>Example outoupt:</p>
<pre><code class="hljs css language-shell">["public/default","public/functions"]
</code></pre>
<ol start="4">
<li>All builtin connectors should be listed as available.</li>
</ol>
<pre><code class="hljs css language-bash">curl -s http://localhost:8080/admin/v2/<span class="hljs-built_in">functions</span>/connectors
</code></pre>
<p>Example output:</p>
<pre><code class="hljs css language-json">[{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"aerospike"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Aerospike database sink"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.aerospike.AerospikeStringSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"cassandra"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Writes data into Cassandra"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.cassandra.CassandraStringSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"kafka"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Kafka source and sink connector"</span>,<span class="hljs-attr">"sourceClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.kafka.KafkaStringSource"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.kafka.KafkaBytesSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"kinesis"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Kinesis sink connector"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.kinesis.KinesisSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"rabbitmq"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"RabbitMQ source connector"</span>,<span class="hljs-attr">"sourceClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.rabbitmq.RabbitMQSource"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"twitter"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Ingest data from Twitter firehose"</span>,<span class="hljs-attr">"sourceClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.twitter.TwitterFireHose"</span>}]
</code></pre>
<p>If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running <code>pulsar/standalone</code>,
or you can navigate the <code>logs</code> directory under the Pulsar directory to view the logs.</p>
<h2><a class="anchor" aria-hidden="true" id="connect-pulsar-to-apache-cassandra"></a><a href="#connect-pulsar-to-apache-cassandra" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Connect Pulsar to Apache Cassandra</h2>
<blockquote>
<p>Make sure you have docker available at your laptop. If you don't have docker installed, you can follow the <a href="https://docs.docker.com/docker-for-mac/install/">instructions</a>.</p>
</blockquote>
<p>We are using <code>cassandra</code> docker image to start a single-node cassandra cluster in Docker.</p>
<h3><a class="anchor" aria-hidden="true" id="setup-the-cassandra-cluster"></a><a href="#setup-the-cassandra-cluster" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Setup the Cassandra Cluster</h3>
<h4><a class="anchor" aria-hidden="true" id="start-a-cassandra-cluster"></a><a href="#start-a-cassandra-cluster" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Start a Cassandra Cluster</h4>
<pre><code class="hljs css language-bash">docker run -d --rm --name=cassandra -p 9042:9042 cassandra
</code></pre>
<p>Before moving to next steps, make sure the cassandra cluster is up running.</p>
<ol>
<li>Make sure the docker process is running.</li>
</ol>
<pre><code class="hljs css language-bash">docker ps
</code></pre>
<ol start="2">
<li>Check the cassandra logs to make sure cassandra process is running as expected.</li>
</ol>
<pre><code class="hljs css language-bash">docker logs cassandra
</code></pre>
<ol start="3">
<li>Check the cluster status</li>
</ol>
<pre><code class="hljs css language-bash">docker <span class="hljs-built_in">exec</span> cassandra nodetool status
</code></pre>
<p>Example output:</p>
<pre><code class="hljs">Datacenter: datacenter1
=======================
<span class="hljs-attribute">Status</span>=Up/Down
|/ <span class="hljs-attribute">State</span>=Normal/Leaving/Joining/Moving
-- <span class="hljs-built_in"> Address </span> Load Tokens Owns (effective) Host ID Rack
UN 172.17.0.2 103.67 KiB 256 100.0% af0e4b2f-84e0-4f0b-bb14-bd5f9070ff26 rack1
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="create-keyspace-and-table"></a><a href="#create-keyspace-and-table" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create keyspace and table</h4>
<p>We are using <code>cqlsh</code> to connect to the cassandra cluster to create keyspace and table.</p>
<pre><code class="hljs css language-bash">$ docker <span class="hljs-built_in">exec</span> -ti cassandra cqlsh localhost
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Use HELP <span class="hljs-keyword">for</span> <span class="hljs-built_in">help</span>.
cqlsh&gt;
</code></pre>
<p>All the following commands are executed in <code>cqlsh</code>.</p>
<h5><a class="anchor" aria-hidden="true" id="create-keyspace-pulsar_test_keyspace"></a><a href="#create-keyspace-pulsar_test_keyspace" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create keyspace <code>pulsar_test_keyspace</code></h5>
<pre><code class="hljs css language-bash">cqlsh&gt; CREATE KEYSPACE pulsar_test_keyspace WITH replication = {<span class="hljs-string">'class'</span>:<span class="hljs-string">'SimpleStrategy'</span>, <span class="hljs-string">'replication_factor'</span>:1};
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="create-table-pulsar_test_table"></a><a href="#create-table-pulsar_test_table" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create table <code>pulsar_test_table</code></h4>
<pre><code class="hljs css language-bash">cqlsh&gt; USE pulsar_test_keyspace;
cqlsh:pulsar_test_keyspace&gt; CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="configure-a-cassandra-sink"></a><a href="#configure-a-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure a Cassandra Sink</h3>
<p>Now that we have a Cassandra cluster running locally. In this section, we will configure a Cassandra sink connector.
The Cassandra sink connector will read messages from a Pulsar topic and write the messages into a Cassandra table.</p>
<p>In order to run a Cassandra sink connector, you need to prepare a yaml config file including informations that Pulsar IO
runtime needs to know. For example, how Pulsar IO can find the cassandra cluster, what is the keyspace and table that
Pulsar IO will be using for writing Pulsar messages to.</p>
<p>Create a file <code>examples/cassandra-sink.yml</code> and edit it to fill in following content:</p>
<pre><code class="hljs"><span class="hljs-symbol">configs:</span>
<span class="hljs-symbol"> roots:</span> <span class="hljs-string">"localhost:9042"</span>
<span class="hljs-symbol"> keyspace:</span> <span class="hljs-string">"pulsar_test_keyspace"</span>
<span class="hljs-symbol"> columnFamily:</span> <span class="hljs-string">"pulsar_test_table"</span>
<span class="hljs-symbol"> keyname:</span> <span class="hljs-string">"key"</span>
<span class="hljs-symbol"> columnName:</span> <span class="hljs-string">"col"</span>
</code></pre>
<p>To learn more about Cassandra Connector, see <a href="/docs/en/2.2.1/io-cassandra">Cassandra Connector</a>.</p>
<h3><a class="anchor" aria-hidden="true" id="submit-a-cassandra-sink"></a><a href="#submit-a-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Submit a Cassandra Sink</h3>
<p>Pulsar provides the <a href="/docs/en/2.2.1/reference-cli-tools">CLI</a> for running and managing Pulsar I/O connectors.</p>
<p>We can run following command to sink a sink connector with type <code>cassandra</code> and config file <code>examples/cassandra-sink.yml</code>.</p>
<h4><a class="anchor" aria-hidden="true" id="note"></a><a href="#note" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Note</h4>
<blockquote>
<p>The <code>sink-type</code> parameter of the currently built-in connectors is determined by the setting of the <code>name</code> parameter specified in the pulsar-io.yaml file.</p>
</blockquote>
<pre><code class="hljs css language-shell">bin/pulsar-admin sink create \
--tenant public \
--namespace default \
--name cassandra-test-sink \
--sink-type cassandra \
--sink-config-file examples/cassandra-sink.yml \
--inputs test_cassandra
</code></pre>
<p>Once the command is executed, Pulsar will create a sink connector named <code>cassandra-test-sink</code> and the sink connector will be running
as a Pulsar Function and write the messages produced in topic <code>test_cassandra</code> to Cassandra table <code>pulsar_test_table</code>.</p>
<h3><a class="anchor" aria-hidden="true" id="inspect-the-cassandra-sink"></a><a href="#inspect-the-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Inspect the Cassandra Sink</h3>
<p>Since an IO connector is running as <a href="/docs/en/2.2.1/functions-overview">Pulsar Functions</a>, you can use <a href="/docs/en/2.2.1/reference-pulsar-admin#functions">functions CLI</a>
for inspecting and managing the IO connectors.</p>
<h4><a class="anchor" aria-hidden="true" id="retrieve-sink-info"></a><a href="#retrieve-sink-info" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Retrieve Sink Info</h4>
<pre><code class="hljs css language-bash">bin/pulsar-admin <span class="hljs-built_in">functions</span> get \
--tenant public \
--namespace default \
--name cassandra-test-sink
</code></pre>
<p>Example output:</p>
<pre><code class="hljs css language-shell">{
"tenant": "public",
"namespace": "default",
"name": "cassandra-test-sink",
"className": "org.apache.pulsar.functions.api.utils.IdentityFunction",
"autoAck": true,
"parallelism": 1,
"source": {
"topicsToSerDeClassName": {
"test_cassandra": ""
}
},
"sink": {
"configs": "{\"roots\":\"cassandra\",\"keyspace\":\"pulsar_test_keyspace\",\"columnFamily\":\"pulsar_test_table\",\"keyname\":\"key\",\"columnName\":\"col\"}",
"builtin": "cassandra"
},
"resources": {}
}
</code></pre>
<h4><a class="anchor" aria-hidden="true" id="check-sink-running-status"></a><a href="#check-sink-running-status" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Check Sink Running Status</h4>
<pre><code class="hljs css language-bash">bin/pulsar-admin <span class="hljs-built_in">functions</span> getstatus \
--tenant public \
--namespace default \
--name cassandra-test-sink
</code></pre>
<p>Example output:</p>
<pre><code class="hljs css language-shell">{
"functionStatusList": [
{
"running": true,
"instanceId": "0",
"metrics": {
"metrics": {
"__total_processed__": {},
"__total_successfully_processed__": {},
"__total_system_exceptions__": {},
"__total_user_exceptions__": {},
"__total_serialization_exceptions__": {},
"__avg_latency_ms__": {}
}
},
"workerId": "c-standalone-fw-localhost-6750"
}
]
}
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="verify-the-cassandra-sink"></a><a href="#verify-the-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Verify the Cassandra Sink</h3>
<p>Now lets produce some messages to the input topic of the Cassandra sink <code>test_cassandra</code>.</p>
<pre><code class="hljs css language-bash"><span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> {0..9}; <span class="hljs-keyword">do</span> bin/pulsar-client produce -m <span class="hljs-string">"key-<span class="hljs-variable">$i</span>"</span> -n 1 test_cassandra; <span class="hljs-keyword">done</span>
</code></pre>
<p>Inspect the sink running status again. You should be able to see 10 messages are processed by the Cassandra sink.</p>
<pre><code class="hljs css language-bash">bin/pulsar-admin <span class="hljs-built_in">functions</span> getstatus \
--tenant public \
--namespace default \
--name cassandra-test-sink
</code></pre>
<p>Example output:</p>
<pre><code class="hljs css language-shell">{
"functionStatusList": [
{
"running": true,
"numProcessed": "11",
"numSuccessfullyProcessed": "11",
"lastInvocationTime": "1532031040117",
"instanceId": "0",
"metrics": {
"metrics": {
"__total_processed__": {
"count": 5.0,
"sum": 5.0,
"max": 5.0
},
"__total_successfully_processed__": {
"count": 5.0,
"sum": 5.0,
"max": 5.0
},
"__total_system_exceptions__": {},
"__total_user_exceptions__": {},
"__total_serialization_exceptions__": {},
"__avg_latency_ms__": {}
}
},
"workerId": "c-standalone-fw-localhost-6750"
}
]
}
</code></pre>
<p>Finally, lets inspect the results in Cassandra using <code>cqlsh</code></p>
<pre><code class="hljs css language-bash">docker <span class="hljs-built_in">exec</span> -ti cassandra cqlsh localhost
</code></pre>
<p>Select the rows from the Cassandra table <code>pulsar_test_table</code>:</p>
<pre><code class="hljs css language-bash">cqlsh&gt; use pulsar_test_keyspace;
cqlsh:pulsar_test_keyspace&gt; select * from pulsar_test_table;
key | col
--------+--------
key-5 | key-5
key-0 | key-0
key-9 | key-9
key-2 | key-2
key-1 | key-1
key-3 | key-3
key-6 | key-6
key-7 | key-7
key-4 | key-4
key-8 | key-8
</code></pre>
<h3><a class="anchor" aria-hidden="true" id="delete-the-cassandra-sink"></a><a href="#delete-the-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Delete the Cassandra Sink</h3>
<pre><code class="hljs css language-shell">bin/pulsar-admin sink delete \
--tenant public \
--namespace default \
--name cassandra-test-sink
</code></pre>
</span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.2.1/io-overview"><span class="arrow-prev"></span><span>Overview</span></a><a class="docs-next button" href="/docs/en/2.2.1/io-managing"><span>Next</span><span class="arrow-next"></span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#installing-pulsar">Installing Pulsar</a></li><li><a href="#installing-builtin-connectors">Installing Builtin Connectors</a></li><li><a href="#start-pulsar-service">Start Pulsar Service</a></li><li><a href="#connect-pulsar-to-apache-cassandra">Connect Pulsar to Apache Cassandra</a><ul class="toc-headings"><li><a href="#setup-the-cassandra-cluster">Setup the Cassandra Cluster</a></li><li><a href="#configure-a-cassandra-sink">Configure a Cassandra Sink</a></li><li><a href="#submit-a-cassandra-sink">Submit a Cassandra Sink</a></li><li><a href="#inspect-the-cassandra-sink">Inspect the Cassandra Sink</a></li><li><a href="#verify-the-cassandra-sink">Verify the Cassandra Sink</a></li><li><a href="#delete-the-cassandra-sink">Delete the Cassandra Sink</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>