| <!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>CDC Canal Connector · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="### Source Configuration Options"/><meta name="docsearch:version" content="2.4.1"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="CDC Canal Connector · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="### Source Configuration Options"/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.4.1</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class=""><a href="/docs/en/2.4.1/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class=""><a href="/docs/en/2.4.1/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.4.1/io-cdc-canal">日本語</a></li><li><a href="/docs/fr/2.4.1/io-cdc-canal">Français</a></li><li><a href="/docs/ko/2.4.1/io-cdc-canal">한국어</a></li><li><a href="/docs/zh-CN/2.4.1/io-cdc-canal">中文</a></li><li><a href="/docs/zh-TW/2.4.1/io-cdc-canal">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script> |
| const languagesMenuItem = document.getElementById("languages-menu"); |
| const languagesDropDown = document.getElementById("languages-dropdown"); |
| languagesMenuItem.addEventListener("click", function(event) { |
| event.preventDefault(); |
| |
| if (languagesDropDown.className == "hide") { |
| languagesDropDown.className = "visible"; |
| } else { |
| languagesDropDown.className = "hide"; |
| } |
| }); |
| </script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-cdc-canal.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">CDC Canal Connector</h1></header><article><div><span><h3><a class="anchor" aria-hidden="true" id="source-configuration-options"></a><a href="#source-configuration-options" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Source Configuration Options</h3> |
| <p>The Configuration is mostly related to Canal task config.</p> |
| <table> |
| <thead> |
| <tr><th>Name</th><th>Required</th><th>Default</th><th>Description</th></tr> |
| </thead> |
| <tbody> |
| <tr><td><code>zkServers</code></td><td><code>false</code></td><td><code>127.0.0.1:2181</code></td><td><code>The address and port of the zookeeper . if canal server configured to cluster mode</code></td></tr> |
| <tr><td><code>batchSize</code></td><td><code>true</code></td><td><code>5120</code></td><td><code>Take 5120 records from the canal server in batches</code></td></tr> |
| <tr><td><code>username</code></td><td><code>false</code></td><td>``</td><td><code>Canal server account, not MySQL</code></td></tr> |
| <tr><td><code>password</code></td><td><code>false</code></td><td>``</td><td><code>Canal server password, not MySQL</code></td></tr> |
| <tr><td><code>cluster</code></td><td><code>false</code></td><td><code>false</code></td><td><code>Decide whether to open cluster mode based on canal server configuration, true: cluster mode, false: standalone mode</code></td></tr> |
| <tr><td><code>singleHostname</code></td><td><code>false</code></td><td><code>127.0.0.1</code></td><td><code>The address of canal server</code></td></tr> |
| <tr><td><code>singlePort</code></td><td><code>false</code></td><td><code>11111</code></td><td><code>The port of canal server</code></td></tr> |
| </tbody> |
| </table> |
| <h3><a class="anchor" aria-hidden="true" id="configuration-example"></a><a href="#configuration-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuration Example</h3> |
| <p>Here is a configuration Json example:</p> |
| <pre><code class="hljs css language-$json">{ |
| <span class="hljs-attr">"zkServers"</span>: <span class="hljs-string">"127.0.0.1:2181"</span>, |
| <span class="hljs-attr">"batchSize"</span>: <span class="hljs-string">"5120"</span>, |
| <span class="hljs-attr">"destination"</span>: <span class="hljs-string">"example"</span>, |
| <span class="hljs-attr">"username"</span>: <span class="hljs-string">""</span>, |
| <span class="hljs-attr">"password"</span>: <span class="hljs-string">""</span>, |
| <span class="hljs-attr">"cluster"</span>: <span class="hljs-literal">false</span>, |
| <span class="hljs-attr">"singleHostname"</span>: <span class="hljs-string">"127.0.0.1"</span>, |
| <span class="hljs-attr">"singlePort"</span>: <span class="hljs-string">"11111"</span>, |
| } |
| </code></pre> |
| <p>You could also find the yaml example in this <a href="https://github.com/apache/pulsar/blob/master/pulsar-io/canal/src/main/resources/canal-mysql-source-config.yaml">file</a>, which has similar content below:</p> |
| <pre><code class="hljs css language-$yaml"><span class="hljs-symbol">configs:</span> |
| <span class="hljs-symbol"> zkServers:</span> <span class="hljs-string">"127.0.0.1:2181"</span> |
| <span class="hljs-symbol"> batchSize:</span> <span class="hljs-string">"5120"</span> |
| <span class="hljs-symbol"> destination:</span> <span class="hljs-string">"example"</span> |
| <span class="hljs-symbol"> username:</span> <span class="hljs-string">""</span> |
| <span class="hljs-symbol"> password:</span> <span class="hljs-string">""</span> |
| <span class="hljs-symbol"> cluster:</span> false |
| <span class="hljs-symbol"> singleHostname:</span> <span class="hljs-string">"127.0.0.1"</span> |
| <span class="hljs-symbol"> singlePort:</span> <span class="hljs-string">"11111"</span> |
| </code></pre> |
| <h3><a class="anchor" aria-hidden="true" id="usage-example"></a><a href="#usage-example" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Usage example</h3> |
| <p>Here is a simple example to store MySQL change data using above example config.</p> |
| <ul> |
| <li>Start a MySQL server</li> |
| </ul> |
| <pre><code class="hljs css language-$bash">docker pull mysql:5.7 |
| docker <span class="hljs-builtin-name">run</span> -d -it --rm --name pulsar-mysql -p 3306:3306 -e <span class="hljs-attribute">MYSQL_ROOT_PASSWORD</span>=canal -e <span class="hljs-attribute">MYSQL_USER</span>=mysqluser -e <span class="hljs-attribute">MYSQL_PASSWORD</span>=mysqlpw mysql:5.7 |
| </code></pre> |
| <ul> |
| <li>Modify configuration files mysqld.cnf</li> |
| </ul> |
| <pre><code class="hljs"><span class="hljs-section">[mysqld]</span> |
| <span class="hljs-attr">pid-file</span> = /var/run/mysqld/mysqld.pid |
| <span class="hljs-attr">socket</span> = /var/run/mysqld/mysqld.sock |
| <span class="hljs-attr">datadir</span> = /var/lib/mysql |
| <span class="hljs-comment">#log-error = /var/log/mysql/error.log</span> |
| <span class="hljs-comment"># By default we only accept connections from localhost</span> |
| <span class="hljs-comment">#bind-address = 127.0.0.1</span> |
| <span class="hljs-comment"># Disabling symbolic-links is recommended to prevent assorted security risks</span> |
| <span class="hljs-attr">symbolic-links</span>=<span class="hljs-number">0</span> |
| <span class="hljs-attr">log-bin</span>=mysql-bin |
| <span class="hljs-attr">binlog-format</span>=ROW |
| <span class="hljs-attr">server_id</span>=<span class="hljs-number">1</span> |
| </code></pre> |
| <ul> |
| <li>Copy file to mysql server from local and restart mysql server</li> |
| </ul> |
| <pre><code class="hljs css language-$bash">docker <span class="hljs-keyword">cp</span> mysqld.<span class="hljs-keyword">cnf</span> pulsar-mysq<span class="hljs-variable">l:</span>/etc/mysql/mysql.<span class="hljs-keyword">conf</span>.d/ |
| docker restart pulsar-mysql |
| </code></pre> |
| <ul> |
| <li>Create test database in mysql server</li> |
| </ul> |
| <pre><code class="hljs css language-$bash">docker exec -it pulsar-mysql /bin/bash |
| mysql -h 127.0.0.1 -uroot -pcanal -e '<span class="hljs-keyword">create</span> <span class="hljs-keyword">database</span> <span class="hljs-keyword">test</span>;' |
| </code></pre> |
| <ul> |
| <li>Start canal server and connect mysql server</li> |
| </ul> |
| <pre><code class="hljs">docker pull canal/canal-server:v1.1.2 |
| docker <span class="hljs-builtin-name">run</span> -d -it --link pulsar-mysql -e canal.auto.<span class="hljs-attribute">scan</span>=<span class="hljs-literal">false</span> -e canal.<span class="hljs-attribute">destinations</span>=test -e canal.instance.master.<span class="hljs-attribute">address</span>=pulsar-mysql:3306 -e canal.instance.<span class="hljs-attribute">dbUsername</span>=root -e canal.instance.<span class="hljs-attribute">dbPassword</span>=canal -e canal.instance.<span class="hljs-attribute">connectionCharset</span>=UTF-8 -e canal.instance.tsdb.<span class="hljs-attribute">enable</span>=<span class="hljs-literal">true</span> -e canal.instance.<span class="hljs-attribute">gtidon</span>=<span class="hljs-literal">false</span> <span class="hljs-attribute">--name</span>=pulsar-canal-server -p 8000:8000 -p 2222:2222 -p 11111:11111 -p 11112:11112 -m 4096m canal/canal-server:v1.1.2 |
| </code></pre> |
| <ul> |
| <li>Start pulsar standalone</li> |
| </ul> |
| <pre><code class="hljs css language-$bash">docker pull apachepulsar/pulsar:<span class="hljs-number">2.4</span><span class="hljs-number">.1</span> |
| docker run -d -it --link pulsar-canal-server -p <span class="hljs-number">6650</span>:<span class="hljs-number">6650</span> -p <span class="hljs-number">8080</span>:<span class="hljs-number">8080</span> -v $PWD/data:/pulsar/data --name pulsar-standalone apachepulsar/pulsar:<span class="hljs-number">2.4</span><span class="hljs-number">.1</span> bin/pulsar standalone |
| </code></pre> |
| <ul> |
| <li><p>Start pulsar-io in standalone</p></li> |
| <li><p>Config file canal-mysql-source-config.yaml</p></li> |
| </ul> |
| <pre><code class="hljs css language-$yaml"><span class="hljs-symbol">configs:</span> |
| <span class="hljs-symbol"> zkServers:</span> <span class="hljs-string">""</span> |
| <span class="hljs-symbol"> batchSize:</span> <span class="hljs-string">"5120"</span> |
| <span class="hljs-symbol"> destination:</span> <span class="hljs-string">"test"</span> |
| <span class="hljs-symbol"> username:</span> <span class="hljs-string">""</span> |
| <span class="hljs-symbol"> password:</span> <span class="hljs-string">""</span> |
| <span class="hljs-symbol"> cluster:</span> false |
| <span class="hljs-symbol"> singleHostname:</span> <span class="hljs-string">"pulsar-canal-server"</span> |
| <span class="hljs-symbol"> singlePort:</span> <span class="hljs-string">"11111"</span> |
| </code></pre> |
| <ul> |
| <li>Consumer file pulsar-client.py for test</li> |
| </ul> |
| <pre><code class="hljs">import pulsar |
| <span class="hljs-built_in"> |
| client </span>= pulsar.Client(<span class="hljs-string">'pulsar://localhost:6650'</span>) |
| consumer = client.subscribe(<span class="hljs-string">'my-topic'</span>, |
| <span class="hljs-attribute">subscription_name</span>=<span class="hljs-string">'my-sub'</span>) |
| |
| <span class="hljs-keyword">while</span> <span class="hljs-literal">True</span>: |
| msg = consumer.receive() |
| <span class="hljs-builtin-name">print</span>(<span class="hljs-string">"Received message: '%s'"</span> % msg.data()) |
| consumer.acknowledge(msg) |
| |
| client.close() |
| </code></pre> |
| <ul> |
| <li>Copy config file and test file to pulsar server</li> |
| </ul> |
| <pre><code class="hljs css language-$bash">docker <span class="hljs-keyword">cp</span> canal-mysql-<span class="hljs-keyword">source</span>-config.yaml pulsar-standalone:/pulsar/<span class="hljs-keyword">conf</span>/ |
| docker <span class="hljs-keyword">cp</span> pulsar-client.<span class="hljs-keyword">py</span> pulsar-standalone:/pulsar/ |
| </code></pre> |
| <ul> |
| <li>Download canal connector and start canal connector</li> |
| </ul> |
| <pre><code class="hljs css language-$bash">docker exec -it pulsar-standalone <span class="hljs-string">/bin/bash</span> |
| wget http:<span class="hljs-string">//apache.01link.hk/pulsar/pulsar-2.4.1/connectors/pulsar-io-canal-2.4.1.nar</span> -P connectors |
| <span class="hljs-string">./bin/pulsar-admin</span> sources localrun <span class="hljs-params">--archive</span> <span class="hljs-string">./connectors/pulsar-io-canal-2.4.1.nar</span> <span class="hljs-params">--classname</span> org.apache.pulsar.io.canal.CanalStringSource <span class="hljs-params">--tenant</span> public <span class="hljs-params">--namespace</span> default <span class="hljs-params">--name</span> canal <span class="hljs-params">--destination-topic-name</span> my-topic <span class="hljs-params">--source-config-file</span> <span class="hljs-string">/pulsar/conf/canal-mysql-source-config.yaml</span> <span class="hljs-params">--parallelism</span> 1 |
| </code></pre> |
| <ul> |
| <li>Consumption data</li> |
| </ul> |
| <pre><code class="hljs css language-$bash"><span class="hljs-symbol">docker</span> exec -<span class="hljs-keyword">it </span>pulsar-standalone /<span class="hljs-keyword">bin/bash |
| </span><span class="hljs-symbol">python</span> pulsar-client.py |
| </code></pre> |
| <ul> |
| <li>Open another window for login mysql server</li> |
| </ul> |
| <pre><code class="hljs css language-$bash">docker exec -it pulsar-mysql /bin/bash |
| mysql -h <span class="hljs-number">127.0</span><span class="hljs-number">.0</span><span class="hljs-number">.1</span> -uroot -pcanal |
| </code></pre> |
| <ul> |
| <li>Create table and insert, delete, update data in mysql server</li> |
| </ul> |
| <pre><code class="hljs">mysql> use test; |
| mysql> show tables; |
| mysql> <span class="hljs-keyword">CREATE</span> TABLE <span class="hljs-keyword">IF</span> <span class="hljs-keyword">NOT</span> <span class="hljs-keyword">EXISTS</span> <span class="hljs-symbol">`test_table`</span>(<span class="hljs-symbol">`test_id`</span> INT UNSIGNED AUTO_INCREMENT,<span class="hljs-symbol">`test_title`</span> VARCHAR(<span class="hljs-number">100</span>) <span class="hljs-keyword">NOT</span> <span class="hljs-literal">NULL</span>, |
| <span class="hljs-symbol">`test_author`</span> VARCHAR(<span class="hljs-number">40</span>) <span class="hljs-keyword">NOT</span> <span class="hljs-literal">NULL</span>, |
| <span class="hljs-symbol">`test_date`</span> DATE,<span class="hljs-keyword">PRIMARY</span> <span class="hljs-keyword">KEY</span> ( <span class="hljs-symbol">`test_id`</span> ))ENGINE=InnoDB DEFAULT CHARSET=utf8; |
| mysql> <span class="hljs-keyword">INSERT</span> <span class="hljs-keyword">INTO</span> test_table (test_title, test_author, test_date) <span class="hljs-keyword">VALUES</span>(<span class="hljs-string">"a"</span>, <span class="hljs-string">"b"</span>, NOW()); |
| mysql> <span class="hljs-keyword">UPDATE</span> test_table <span class="hljs-keyword">SET</span> test_title=<span class="hljs-string">'c'</span> <span class="hljs-keyword">WHERE</span> test_title=<span class="hljs-string">'a'</span>; |
| mysql> <span class="hljs-keyword">DELETE</span> <span class="hljs-keyword">FROM</span> test_table <span class="hljs-keyword">WHERE</span> test_title=<span class="hljs-string">'c'</span>; |
| </code></pre> |
| </span></div></article></div><div class="docs-prevnext"></div></div></div><nav class="onPageNav"></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script> |
| const community = document.querySelector("a[href='#community']").parentNode; |
| const communityMenu = |
| '<li>' + |
| '<a id="community-menu" href="#">Community <span style="font-size: 0.75em"> ▼</span></a>' + |
| '<div id="community-dropdown" class="hide">' + |
| '<ul id="community-dropdown-items">' + |
| '<li><a href="/en/contact">Contact</a></li>' + |
| '<li><a href="/en/contributing">Contributing</a></li>' + |
| '<li><a href="/en/coding-guide">Coding guide</a></li>' + |
| '<li><a href="/en/events">Events</a></li>' + |
| '<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter ❐</a></li>' + |
| '<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki ❐</a></li>' + |
| '<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking ❐</a></li>' + |
| '<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit ❐</a></li>' + |
| '<li> </li>' + |
| '<li><a href="/en/resources">Resources</a></li>' + |
| '<li><a href="/en/team">Team</a></li>' + |
| '<li><a href="/en/powered-by">Powered By</a></li>' + |
| '</ul>' + |
| '</div>' + |
| '</li>'; |
| |
| community.innerHTML = communityMenu; |
| |
| const communityMenuItem = document.getElementById("community-menu"); |
| const communityDropDown = document.getElementById("community-dropdown"); |
| communityMenuItem.addEventListener("click", function(event) { |
| event.preventDefault(); |
| |
| if (communityDropDown.className == 'hide') { |
| communityDropDown.className = 'visible'; |
| } else { |
| communityDropDown.className = 'hide'; |
| } |
| }); |
| </script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html> |