| <!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Tutorial: Connecting Pulsar with Apache Cassandra · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code."/><meta name="docsearch:version" content="2.2.0"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Tutorial: Connecting Pulsar with Apache Cassandra · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code."/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.2.0</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class="siteNavGroupActive"><a href="/docs/en/2.2.0/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class="siteNavGroupActive"><a href="/docs/en/2.2.0/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.2.0/io-quickstart">日本語</a></li><li><a href="/docs/fr/2.2.0/io-quickstart">Français</a></li><li><a href="/docs/ko/2.2.0/io-quickstart">한국어</a></li><li><a href="/docs/zh-CN/2.2.0/io-quickstart">中文</a></li><li><a href="/docs/zh-TW/2.2.0/io-quickstart">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script> |
| const languagesMenuItem = document.getElementById("languages-menu"); |
| const languagesDropDown = document.getElementById("languages-dropdown"); |
| languagesMenuItem.addEventListener("click", function(event) { |
| event.preventDefault(); |
| |
| if (languagesDropDown.className == "hide") { |
| languagesDropDown.className = "visible"; |
| } else { |
| languagesDropDown.className = "hide"; |
| } |
| }); |
| </script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="docsNavContainer" id="docsNav"><nav class="toc"><div class="toggleNav"><section class="navWrapper wrapper"><div class="navBreadcrumb wrapper"><div class="navToggle" id="navToggler"><div class="hamburger-menu"><div class="line1"></div><div class="line2"></div><div class="line3"></div></div></div><h2><i>›</i><span>Pulsar IO</span></h2><div class="tocToggler" id="tocToggler"><i class="icon-toc"></i></div></div><div class="navGroups"><div class="navGroup"><h3 class="navGroupCategoryTitle">Getting Started</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/pulsar-2.0">Pulsar 2.0</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/getting-started-standalone">Run Pulsar locally</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/getting-started-docker">Pulsar in Docker</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/client-libraries">Client libraries</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Concepts and Architecture</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/concepts-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/concepts-messaging">Messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/concepts-architecture-overview">Architecture</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/concepts-clients">Clients</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/concepts-replication">Geo Replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/concepts-multi-tenancy">Multi Tenancy</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/concepts-authentication">Authentication and Authorization</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/concepts-topic-compaction">Topic Compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/concepts-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/concepts-schema-registry">Schema Registry</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar Functions</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/functions-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/functions-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/functions-api">API</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/functions-deploying">Deploying functions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/functions-guarantees">Processing guarantees</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/functions-state">State Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/functions-metrics">Metrics</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar IO</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/io-overview">Overview</a></li><li class="navListItem navListItemActive"><a class="navItem" href="/docs/en/2.2.0/io-quickstart">Getting started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/io-managing">Managing Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/io-connectors">Builtin Connectors</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/io-develop">Developing Connectors</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Pulsar SQL</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/sql-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/sql-getting-started">Getting Started</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/sql-deployment-configurations">Deployment and Configuration</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Deployment</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/deploy-aws">Amazon Web Services</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/deploy-kubernetes">Kubernetes</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/deploy-bare-metal">Bare metal</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/deploy-bare-metal-multi-cluster">Bare metal multi-cluster</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/deploy-monitoring">Monitoring</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Administration</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/administration-zk-bk">ZooKeeper and BookKeeper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/administration-geo">Geo-replication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/administration-dashboard">Dashboard</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/administration-stats">Pulsar statistics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/administration-load-distribution">Load distribution</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/administration-proxy">Pulsar proxy</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Security</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/security-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/security-tls-transport">Transport Encryption using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/security-tls-authentication">Authentication using TLS</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/security-athenz">Authentication using Athenz</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/security-authorization">Authorization and ACLs</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/security-encryption">End-to-End Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/security-extending">Extending</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Client Libraries</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/client-libraries-java">Java</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/client-libraries-go">Go</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/client-libraries-python">Python</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/client-libraries-cpp">C++</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/client-libraries-websocket">WebSocket</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Admin API</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/admin-api-overview">Overview</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/admin-api-clusters">Clusters</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/admin-api-tenants">Tenants</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/admin-api-brokers">Brokers</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/admin-api-namespaces">Namespaces</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/admin-api-permissions">Permissions</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/admin-api-persistent-topics">Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/admin-api-non-persistent-topics">Non-Persistent topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/admin-api-partitioned-topics">Partitioned topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/admin-api-schemas">Schemas</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Adaptors</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/adaptors-kafka">Kafka client wrapper</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/adaptors-spark">Apache Spark</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/adaptors-storm">Apache Storm</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Cookbooks</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/cookbooks-tiered-storage">Tiered Storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/cookbooks-compaction">Topic compaction</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/cookbooks-deduplication">Message deduplication</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/cookbooks-non-persistent">Non-persistent messaging</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/cookbooks-partitioned">Partitioned Topics</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/cookbooks-retention-expiry">Message retention and expiry</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/cookbooks-encryption">Encryption</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/cookbooks-message-queue">Message queue</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Development</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/develop-tools">Simulation tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/developing-binary-protocol">Binary protocol</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/develop-schema">Custom schema storage</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/develop-load-manager">Modular load manager</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/develop-cpp">Building Pulsar C++ client</a></li></ul></div><div class="navGroup"><h3 class="navGroupCategoryTitle">Reference</h3><ul class=""><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/reference-terminology">Terminology</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/reference-cli-tools">Pulsar CLI tools</a></li><li class="navListItem"><a class="navItem" href="/docs/en/2.2.0/reference-configuration">Pulsar configuration</a></li></ul></div></div></section></div><script> |
| var coll = document.getElementsByClassName('collapsible'); |
| var checkActiveCategory = true; |
| for (var i = 0; i < coll.length; i++) { |
| var links = coll[i].nextElementSibling.getElementsByTagName('*'); |
| if (checkActiveCategory){ |
| for (var j = 0; j < links.length; j++) { |
| if (links[j].classList.contains('navListItemActive')){ |
| coll[i].nextElementSibling.classList.toggle('hide'); |
| coll[i].childNodes[1].classList.toggle('rotate'); |
| checkActiveCategory = false; |
| break; |
| } |
| } |
| } |
| |
| coll[i].addEventListener('click', function() { |
| var arrow = this.childNodes[1]; |
| arrow.classList.toggle('rotate'); |
| var content = this.nextElementSibling; |
| content.classList.toggle('hide'); |
| }); |
| } |
| |
| document.addEventListener('DOMContentLoaded', function() { |
| createToggler('#navToggler', '#docsNav', 'docsSliderActive'); |
| createToggler('#tocToggler', 'body', 'tocActive'); |
| |
| var headings = document.querySelector('.toc-headings'); |
| headings && headings.addEventListener('click', function(event) { |
| var el = event.target; |
| while(el !== headings){ |
| if (el.tagName === 'A') { |
| document.body.classList.remove('tocActive'); |
| break; |
| } else{ |
| el = el.parentNode; |
| } |
| } |
| }, false); |
| |
| function createToggler(togglerSelector, targetSelector, className) { |
| var toggler = document.querySelector(togglerSelector); |
| var target = document.querySelector(targetSelector); |
| |
| if (!toggler) { |
| return; |
| } |
| |
| toggler.onclick = function(event) { |
| event.preventDefault(); |
| |
| target.classList.toggle(className); |
| }; |
| } |
| }); |
| </script></nav></div><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-quickstart.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Tutorial: Connecting Pulsar with Apache Cassandra</h1></header><article><div><span><p>This tutorial provides a hands-on look at how you can move data out of Pulsar without writing a single line of code. |
| It is helpful to review the <a href="/docs/en/2.2.0/io-overview">concepts</a> for Pulsar I/O in tandem with running the steps in this guide |
| to gain a deeper understanding. At the end of this tutorial, you will be able to:</p> |
| <ul> |
| <li>Connect your Pulsar cluster with your Cassandra cluster</li> |
| </ul> |
| <blockquote> |
| <h4><a class="anchor" aria-hidden="true" id="tip"></a><a href="#tip" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Tip</h4> |
| <ol> |
| <li><p>These instructions assume you are running Pulsar in <a href="/docs/en/2.2.0/getting-started-standalone">standalone mode</a>. However all |
| the commands used in this tutorial should be able to be used in a multi-nodes Pulsar cluster without any changes.</p></li> |
| <li><p>All the instructions are assumed to run at the root directory of a Pulsar binary distribution.</p></li> |
| </ol> |
| </blockquote> |
| <h2><a class="anchor" aria-hidden="true" id="installing-pulsar"></a><a href="#installing-pulsar" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Installing Pulsar</h2> |
| <p>To get started running Pulsar, download a binary tarball release in one of the following ways:</p> |
| <ul> |
| <li><p>by clicking the link below and downloading the release from an Apache mirror:</p> |
| <ul> |
| <li><a href="https://archive.apache.org/dist/pulsar/pulsar-2.2.0/apache-pulsar-2.2.0-bin.tar.gz" download>Pulsar 2.2.0 binary release</a></li> |
| </ul></li> |
| <li><p>from the Pulsar <a href="/download">downloads page</a></p></li> |
| <li><p>from the Pulsar <a href="https://github.com/apache/pulsar/releases/latest">releases page</a></p></li> |
| <li><p>using <a href="https://www.gnu.org/software/wget">wget</a>:</p> |
| <pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> wget https://archive.apache.org/dist/pulsar/pulsar-2.2.0/apache-pulsar-2.2.0-bin.tar.gz</span> |
| </code></pre></li> |
| </ul> |
| <p>Once the tarball is downloaded, untar it and <code>cd</code> into the resulting directory:</p> |
| <pre><code class="hljs css language-bash">$ tar xvfz apache-pulsar-2.2.0-bin.tar.gz |
| $ <span class="hljs-built_in">cd</span> apache-pulsar-2.2.0 |
| </code></pre> |
| <h2><a class="anchor" aria-hidden="true" id="installing-builtin-connectors"></a><a href="#installing-builtin-connectors" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Installing Builtin Connectors</h2> |
| <p>Since release <code>2.1.0-incubating</code>, Pulsar releases a separate binary distribution, containing all the <code>builtin</code> connectors. |
| If you would like to enable those <code>builtin</code> connectors, you can download the connectors tarball release in one of the following ways:</p> |
| <ul> |
| <li><p>by clicking the link below and downloading the release from an Apache mirror:</p> |
| <ul> |
| <li><a href="https://archive.apache.org/dist/pulsar/pulsar-2.2.0/apache-pulsar-io-connectors-2.2.0-bin.tar.gz" download>Pulsar IO Connectors 2.2.0 release</a></li> |
| </ul></li> |
| <li><p>from the Pulsar <a href="/download">downloads page</a></p></li> |
| <li><p>from the Pulsar <a href="https://github.com/apache/pulsar/releases/latest">releases page</a></p></li> |
| <li><p>using <a href="https://www.gnu.org/software/wget">wget</a>:</p> |
| <pre><code class="hljs css language-shell"><span class="hljs-meta">$</span><span class="bash"> wget https://archive.apache.org/dist/pulsar/pulsar-2.2.0/apache-pulsar-io-connectors-2.2.0-bin.tar.gz</span> |
| </code></pre></li> |
| </ul> |
| <p>Once the tarball is downloaded, in the pulsar directory, untar the io-connectors package and copy the connectors as <code>connectors</code> |
| in the pulsar directory:</p> |
| <pre><code class="hljs css language-bash">$ tar xvfz /path/to/apache-pulsar-io-connectors-2.2.0-bin.tar.gz |
| |
| // you will find a directory named `apache-pulsar-io-connectors-2.2.0` <span class="hljs-keyword">in</span> the pulsar directory |
| // <span class="hljs-keyword">then</span> copy the connectors |
| |
| $ cp -r apache-pulsar-io-connectors-2.2.0/connectors connectors |
| |
| $ ls connectors |
| pulsar-io-aerospike-2.2.0.nar |
| pulsar-io-cassandra-2.2.0.nar |
| pulsar-io-kafka-2.2.0.nar |
| pulsar-io-kinesis-2.2.0.nar |
| pulsar-io-rabbitmq-2.2.0.nar |
| pulsar-io-twitter-2.2.0.nar |
| ... |
| </code></pre> |
| <h2><a class="anchor" aria-hidden="true" id="start-pulsar-service"></a><a href="#start-pulsar-service" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Start Pulsar Service</h2> |
| <pre><code class="hljs css language-bash">bin/pulsar standalone |
| </code></pre> |
| <p>All the components of a Pulsar service will start in order. You can curl those pulsar service endpoints to make sure Pulsar service is up running correctly.</p> |
| <ol> |
| <li>Check pulsar binary protocol port.</li> |
| </ol> |
| <pre><code class="hljs css language-bash">telnet localhost 6650 |
| </code></pre> |
| <ol start="2"> |
| <li>Check pulsar function cluster</li> |
| </ol> |
| <pre><code class="hljs css language-bash">curl -s http://localhost:8080/admin/v2/worker/cluster |
| </code></pre> |
| <p>Example output:</p> |
| <pre><code class="hljs css language-shell">[{"workerId":"c-standalone-fw-localhost-6750","workerHostname":"localhost","port":6750}] |
| </code></pre> |
| <ol start="3"> |
| <li>Make sure public tenant and default namespace exist</li> |
| </ol> |
| <pre><code class="hljs css language-bash">curl -s http://localhost:8080/admin/v2/namespaces/public |
| </code></pre> |
| <p>Example outoupt:</p> |
| <pre><code class="hljs css language-shell">["public/default","public/functions"] |
| </code></pre> |
| <ol start="4"> |
| <li>All builtin connectors should be listed as available.</li> |
| </ol> |
| <pre><code class="hljs css language-bash">curl -s http://localhost:8080/admin/v2/<span class="hljs-built_in">functions</span>/connectors |
| </code></pre> |
| <p>Example output:</p> |
| <pre><code class="hljs css language-json">[{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"aerospike"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Aerospike database sink"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.aerospike.AerospikeStringSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"cassandra"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Writes data into Cassandra"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.cassandra.CassandraStringSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"kafka"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Kafka source and sink connector"</span>,<span class="hljs-attr">"sourceClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.kafka.KafkaStringSource"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.kafka.KafkaBytesSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"kinesis"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Kinesis sink connector"</span>,<span class="hljs-attr">"sinkClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.kinesis.KinesisSink"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"rabbitmq"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"RabbitMQ source connector"</span>,<span class="hljs-attr">"sourceClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.rabbitmq.RabbitMQSource"</span>},{<span class="hljs-attr">"name"</span>:<span class="hljs-string">"twitter"</span>,<span class="hljs-attr">"description"</span>:<span class="hljs-string">"Ingest data from Twitter firehose"</span>,<span class="hljs-attr">"sourceClass"</span>:<span class="hljs-string">"org.apache.pulsar.io.twitter.TwitterFireHose"</span>}] |
| </code></pre> |
| <p>If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running <code>pulsar/standalone</code>, |
| or you can navigate the <code>logs</code> directory under the Pulsar directory to view the logs.</p> |
| <h2><a class="anchor" aria-hidden="true" id="connect-pulsar-to-apache-cassandra"></a><a href="#connect-pulsar-to-apache-cassandra" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Connect Pulsar to Apache Cassandra</h2> |
| <blockquote> |
| <p>Make sure you have docker available at your laptop. If you don't have docker installed, you can follow the <a href="https://docs.docker.com/docker-for-mac/install/">instructions</a>.</p> |
| </blockquote> |
| <p>We are using <code>cassandra</code> docker image to start a single-node cassandra cluster in Docker.</p> |
| <h3><a class="anchor" aria-hidden="true" id="setup-the-cassandra-cluster"></a><a href="#setup-the-cassandra-cluster" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Setup the Cassandra Cluster</h3> |
| <h4><a class="anchor" aria-hidden="true" id="start-a-cassandra-cluster"></a><a href="#start-a-cassandra-cluster" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Start a Cassandra Cluster</h4> |
| <pre><code class="hljs css language-bash">docker run -d --rm --name=cassandra -p 9042:9042 cassandra |
| </code></pre> |
| <p>Before moving to next steps, make sure the cassandra cluster is up running.</p> |
| <ol> |
| <li>Make sure the docker process is running.</li> |
| </ol> |
| <pre><code class="hljs css language-bash">docker ps |
| </code></pre> |
| <ol start="2"> |
| <li>Check the cassandra logs to make sure cassandra process is running as expected.</li> |
| </ol> |
| <pre><code class="hljs css language-bash">docker logs cassandra |
| </code></pre> |
| <ol start="3"> |
| <li>Check the cluster status</li> |
| </ol> |
| <pre><code class="hljs css language-bash">docker <span class="hljs-built_in">exec</span> cassandra nodetool status |
| </code></pre> |
| <p>Example output:</p> |
| <pre><code class="hljs">Datacenter: datacenter1 |
| ======================= |
| <span class="hljs-attribute">Status</span>=Up/Down |
| |/ <span class="hljs-attribute">State</span>=Normal/Leaving/Joining/Moving |
| -- <span class="hljs-built_in"> Address </span> Load Tokens Owns (effective) Host ID Rack |
| UN 172.17.0.2 103.67 KiB 256 100.0% af0e4b2f-84e0-4f0b-bb14-bd5f9070ff26 rack1 |
| </code></pre> |
| <h4><a class="anchor" aria-hidden="true" id="create-keyspace-and-table"></a><a href="#create-keyspace-and-table" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create keyspace and table</h4> |
| <p>We are using <code>cqlsh</code> to connect to the cassandra cluster to create keyspace and table.</p> |
| <pre><code class="hljs css language-bash">$ docker <span class="hljs-built_in">exec</span> -ti cassandra cqlsh localhost |
| Connected to Test Cluster at localhost:9042. |
| [cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4] |
| Use HELP <span class="hljs-keyword">for</span> <span class="hljs-built_in">help</span>. |
| cqlsh> |
| </code></pre> |
| <p>All the following commands are executed in <code>cqlsh</code>.</p> |
| <h5><a class="anchor" aria-hidden="true" id="create-keyspace-pulsar_test_keyspace"></a><a href="#create-keyspace-pulsar_test_keyspace" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create keyspace <code>pulsar_test_keyspace</code></h5> |
| <pre><code class="hljs css language-bash">cqlsh> CREATE KEYSPACE pulsar_test_keyspace WITH replication = {<span class="hljs-string">'class'</span>:<span class="hljs-string">'SimpleStrategy'</span>, <span class="hljs-string">'replication_factor'</span>:1}; |
| </code></pre> |
| <h4><a class="anchor" aria-hidden="true" id="create-table-pulsar_test_table"></a><a href="#create-table-pulsar_test_table" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Create table <code>pulsar_test_table</code></h4> |
| <pre><code class="hljs css language-bash">cqlsh> USE pulsar_test_keyspace; |
| cqlsh:pulsar_test_keyspace> CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text); |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="configure-a-cassandra-sink"></a><a href="#configure-a-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configure a Cassandra Sink</h3> |
| <p>Now that we have a Cassandra cluster running locally. In this section, we will configure a Cassandra sink connector. |
| The Cassandra sink connector will read messages from a Pulsar topic and write the messages into a Cassandra table.</p> |
| <p>In order to run a Cassandra sink connector, you need to prepare a yaml config file including informations that Pulsar IO |
| runtime needs to know. For example, how Pulsar IO can find the cassandra cluster, what is the keyspace and table that |
| Pulsar IO will be using for writing Pulsar messages to.</p> |
| <p>Create a file <code>examples/cassandra-sink.yml</code> and edit it to fill in following content:</p> |
| <pre><code class="hljs"><span class="hljs-symbol">configs:</span> |
| <span class="hljs-symbol"> roots:</span> <span class="hljs-string">"localhost:9042"</span> |
| <span class="hljs-symbol"> keyspace:</span> <span class="hljs-string">"pulsar_test_keyspace"</span> |
| <span class="hljs-symbol"> columnFamily:</span> <span class="hljs-string">"pulsar_test_table"</span> |
| <span class="hljs-symbol"> keyname:</span> <span class="hljs-string">"key"</span> |
| <span class="hljs-symbol"> columnName:</span> <span class="hljs-string">"col"</span> |
| </code></pre> |
| <p>To learn more about Cassandra Connector, see <a href="/docs/en/2.2.0/io-cassandra">Cassandra Connector</a>.</p> |
| <h3><a class="anchor" aria-hidden="true" id="submit-a-cassandra-sink"></a><a href="#submit-a-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Submit a Cassandra Sink</h3> |
| <p>Pulsar provides the <a href="/docs/en/2.2.0/reference-cli-tools">CLI</a> for running and managing Pulsar I/O connectors.</p> |
| <p>We can run following command to sink a sink connector with type <code>cassandra</code> and config file <code>examples/cassandra-sink.yml</code>.</p> |
| <h4><a class="anchor" aria-hidden="true" id="note"></a><a href="#note" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Note</h4> |
| <blockquote> |
| <p>The <code>sink-type</code> parameter of the currently built-in connectors is determined by the setting of the <code>name</code> parameter specified in the pulsar-io.yaml file.</p> |
| </blockquote> |
| <pre><code class="hljs css language-shell">bin/pulsar-admin sink create \ |
| --tenant public \ |
| --namespace default \ |
| --name cassandra-test-sink \ |
| --sink-type cassandra \ |
| --sink-config-file examples/cassandra-sink.yml \ |
| --inputs test_cassandra |
| </code></pre> |
| <p>Once the command is executed, Pulsar will create a sink connector named <code>cassandra-test-sink</code> and the sink connector will be running |
| as a Pulsar Function and write the messages produced in topic <code>test_cassandra</code> to Cassandra table <code>pulsar_test_table</code>.</p> |
| <h3><a class="anchor" aria-hidden="true" id="inspect-the-cassandra-sink"></a><a href="#inspect-the-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Inspect the Cassandra Sink</h3> |
| <p>Since an IO connector is running as <a href="/docs/en/2.2.0/functions-overview">Pulsar Functions</a>, you can use <a href="/docs/en/2.2.0/reference-pulsar-admin#functions">functions CLI</a> |
| for inspecting and managing the IO connectors.</p> |
| <h4><a class="anchor" aria-hidden="true" id="retrieve-sink-info"></a><a href="#retrieve-sink-info" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Retrieve Sink Info</h4> |
| <pre><code class="hljs css language-bash">bin/pulsar-admin <span class="hljs-built_in">functions</span> get \ |
| --tenant public \ |
| --namespace default \ |
| --name cassandra-test-sink |
| </code></pre> |
| <p>Example output:</p> |
| <pre><code class="hljs css language-shell">{ |
| "tenant": "public", |
| "namespace": "default", |
| "name": "cassandra-test-sink", |
| "className": "org.apache.pulsar.functions.api.utils.IdentityFunction", |
| "autoAck": true, |
| "parallelism": 1, |
| "source": { |
| "topicsToSerDeClassName": { |
| "test_cassandra": "" |
| } |
| }, |
| "sink": { |
| "configs": "{\"roots\":\"cassandra\",\"keyspace\":\"pulsar_test_keyspace\",\"columnFamily\":\"pulsar_test_table\",\"keyname\":\"key\",\"columnName\":\"col\"}", |
| "builtin": "cassandra" |
| }, |
| "resources": {} |
| } |
| </code></pre> |
| <h4><a class="anchor" aria-hidden="true" id="check-sink-running-status"></a><a href="#check-sink-running-status" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Check Sink Running Status</h4> |
| <pre><code class="hljs css language-bash">bin/pulsar-admin <span class="hljs-built_in">functions</span> getstatus \ |
| --tenant public \ |
| --namespace default \ |
| --name cassandra-test-sink |
| </code></pre> |
| <p>Example output:</p> |
| <pre><code class="hljs css language-shell">{ |
| "functionStatusList": [ |
| { |
| "running": true, |
| "instanceId": "0", |
| "metrics": { |
| "metrics": { |
| "__total_processed__": {}, |
| "__total_successfully_processed__": {}, |
| "__total_system_exceptions__": {}, |
| "__total_user_exceptions__": {}, |
| "__total_serialization_exceptions__": {}, |
| "__avg_latency_ms__": {} |
| } |
| }, |
| "workerId": "c-standalone-fw-localhost-6750" |
| } |
| ] |
| } |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="verify-the-cassandra-sink"></a><a href="#verify-the-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Verify the Cassandra Sink</h3> |
| <p>Now lets produce some messages to the input topic of the Cassandra sink <code>test_cassandra</code>.</p> |
| <pre><code class="hljs css language-bash"><span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> {0..9}; <span class="hljs-keyword">do</span> bin/pulsar-client produce -m <span class="hljs-string">"key-<span class="hljs-variable">$i</span>"</span> -n 1 test_cassandra; <span class="hljs-keyword">done</span> |
| </code></pre> |
| <p>Inspect the sink running status again. You should be able to see 10 messages are processed by the Cassandra sink.</p> |
| <pre><code class="hljs css language-bash">bin/pulsar-admin <span class="hljs-built_in">functions</span> getstatus \ |
| --tenant public \ |
| --namespace default \ |
| --name cassandra-test-sink |
| </code></pre> |
| <p>Example output:</p> |
| <pre><code class="hljs css language-shell">{ |
| "functionStatusList": [ |
| { |
| "running": true, |
| "numProcessed": "11", |
| "numSuccessfullyProcessed": "11", |
| "lastInvocationTime": "1532031040117", |
| "instanceId": "0", |
| "metrics": { |
| "metrics": { |
| "__total_processed__": { |
| "count": 5.0, |
| "sum": 5.0, |
| "max": 5.0 |
| }, |
| "__total_successfully_processed__": { |
| "count": 5.0, |
| "sum": 5.0, |
| "max": 5.0 |
| }, |
| "__total_system_exceptions__": {}, |
| "__total_user_exceptions__": {}, |
| "__total_serialization_exceptions__": {}, |
| "__avg_latency_ms__": {} |
| } |
| }, |
| "workerId": "c-standalone-fw-localhost-6750" |
| } |
| ] |
| } |
| </code></pre> |
| <p>Finally, lets inspect the results in Cassandra using <code>cqlsh</code></p> |
| <pre><code class="hljs css language-bash">docker <span class="hljs-built_in">exec</span> -ti cassandra cqlsh localhost |
| </code></pre> |
| <p>Select the rows from the Cassandra table <code>pulsar_test_table</code>:</p> |
| <pre><code class="hljs css language-bash">cqlsh> use pulsar_test_keyspace; |
| cqlsh:pulsar_test_keyspace> select * from pulsar_test_table; |
| |
| key | col |
| --------+-------- |
| key-5 | key-5 |
| key-0 | key-0 |
| key-9 | key-9 |
| key-2 | key-2 |
| key-1 | key-1 |
| key-3 | key-3 |
| key-6 | key-6 |
| key-7 | key-7 |
| key-4 | key-4 |
| key-8 | key-8 |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="delete-the-cassandra-sink"></a><a href="#delete-the-cassandra-sink" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Delete the Cassandra Sink</h3> |
| <pre><code class="hljs css language-shell">bin/pulsar-admin sink delete \ |
| --tenant public \ |
| --namespace default \ |
| --name cassandra-test-sink |
| </code></pre> |
| </span></div></article></div><div class="docs-prevnext"><a class="docs-prev button" href="/docs/en/2.2.0/io-overview"><span class="arrow-prev">← </span><span>Overview</span></a><a class="docs-next button" href="/docs/en/2.2.0/io-managing"><span>Next</span><span class="arrow-next"> →</span></a></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#installing-pulsar">Installing Pulsar</a></li><li><a href="#installing-builtin-connectors">Installing Builtin Connectors</a></li><li><a href="#start-pulsar-service">Start Pulsar Service</a></li><li><a href="#connect-pulsar-to-apache-cassandra">Connect Pulsar to Apache Cassandra</a><ul class="toc-headings"><li><a href="#setup-the-cassandra-cluster">Setup the Cassandra Cluster</a></li><li><a href="#configure-a-cassandra-sink">Configure a Cassandra Sink</a></li><li><a href="#submit-a-cassandra-sink">Submit a Cassandra Sink</a></li><li><a href="#inspect-the-cassandra-sink">Inspect the Cassandra Sink</a></li><li><a href="#verify-the-cassandra-sink">Verify the Cassandra Sink</a></li><li><a href="#delete-the-cassandra-sink">Delete the Cassandra Sink</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script> |
| const community = document.querySelector("a[href='#community']").parentNode; |
| const communityMenu = |
| '<li>' + |
| '<a id="community-menu" href="#">Community <span style="font-size: 0.75em"> ▼</span></a>' + |
| '<div id="community-dropdown" class="hide">' + |
| '<ul id="community-dropdown-items">' + |
| '<li><a href="/en/contact">Contact</a></li>' + |
| '<li><a href="/en/contributing">Contributing</a></li>' + |
| '<li><a href="/en/coding-guide">Coding guide</a></li>' + |
| '<li><a href="/en/events">Events</a></li>' + |
| '<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter ❐</a></li>' + |
| '<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki ❐</a></li>' + |
| '<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking ❐</a></li>' + |
| '<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit ❐</a></li>' + |
| '<li> </li>' + |
| '<li><a href="/en/resources">Resources</a></li>' + |
| '<li><a href="/en/team">Team</a></li>' + |
| '<li><a href="/en/powered-by">Powered By</a></li>' + |
| '</ul>' + |
| '</div>' + |
| '</li>'; |
| |
| community.innerHTML = communityMenu; |
| |
| const communityMenuItem = document.getElementById("community-menu"); |
| const communityDropDown = document.getElementById("community-dropdown"); |
| communityMenuItem.addEventListener("click", function(event) { |
| event.preventDefault(); |
| |
| if (communityDropDown.className == 'hide') { |
| communityDropDown.className = 'visible'; |
| } else { |
| communityDropDown.className = 'hide'; |
| } |
| }); |
| </script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html> |