blob: 555a5137d27f6fb335cae867124ea880d44513da [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charSet="utf-8"/><meta http-equiv="X-UA-Compatible" content="IE=edge"/><title>Debezium source connector · Apache Pulsar</title><meta name="viewport" content="width=device-width, initial-scale=1.0"/><meta name="generator" content="Docusaurus"/><meta name="description" content="The Debezium source connector pulls messages from MySQL or PostgreSQL "/><meta name="docsearch:version" content="2.6.4"/><meta name="docsearch:language" content="en"/><meta property="og:title" content="Debezium source connector · Apache Pulsar"/><meta property="og:type" content="website"/><meta property="og:url" content="https://pulsar.apache.org/"/><meta property="og:description" content="The Debezium source connector pulls messages from MySQL or PostgreSQL "/><meta name="twitter:card" content="summary"/><meta name="twitter:image" content="https://pulsar.apache.org/img/pulsar.svg"/><link rel="shortcut icon" href="/img/pulsar.ico"/><link rel="stylesheet" href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/atom-one-dark.min.css"/><link rel="alternate" type="application/atom+xml" href="https://pulsar.apache.org/blog/atom.xml" title="Apache Pulsar Blog ATOM Feed"/><link rel="alternate" type="application/rss+xml" href="https://pulsar.apache.org/blog/feed.xml" title="Apache Pulsar Blog RSS Feed"/><link rel="stylesheet" href="/css/code-blocks-buttons.css"/><script type="text/javascript" src="https://buttons.github.io/buttons.js"></script><script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/clipboard.js/2.0.0/clipboard.min.js"></script><script type="text/javascript" src="/js/custom.js"></script><script src="/js/scrollSpy.js"></script><link rel="stylesheet" href="/css/main.css"/><script src="/js/codetabs.js"></script></head><body class="sideNavVisible separateOnPageNav"><div class="fixedHeaderContainer"><div class="headerWrapper wrapper"><header><a href="/en"><img class="logo" src="/img/pulsar.svg" alt="Apache Pulsar"/></a><a href="/en/versions"><h3>2.6.4</h3></a><div class="navigationWrapper navigationSlider"><nav class="slidingNav"><ul class="nav-site nav-site-internal"><li class=""><a href="/docs/en/2.6.4/getting-started-standalone" target="_self">Docs</a></li><li class=""><a href="/en/download" target="_self">Download</a></li><li class=""><a href="/docs/en/2.6.4/client-libraries" target="_self">Clients</a></li><li class=""><a href="#restapis" target="_self">REST APIs</a></li><li class=""><a href="#cli" target="_self">Cli</a></li><li class=""><a href="/blog/" target="_self">Blog</a></li><li class=""><a href="#community" target="_self">Community</a></li><li class=""><a href="#apache" target="_self">Apache</a></li><li class=""><a href="https://pulsar-next.staged.apache.org/" target="_self">New Website (Beta)</a></li><span><li><a id="languages-menu" href="#"><img class="languages-icon" src="/img/language.svg" alt="Languages icon"/>English</a><div id="languages-dropdown" class="hide"><ul id="languages-dropdown-items"><li><a href="/docs/ja/2.6.4/io-debezium-source">日本語</a></li><li><a href="/docs/fr/2.6.4/io-debezium-source">Français</a></li><li><a href="/docs/ko/2.6.4/io-debezium-source">한국어</a></li><li><a href="/docs/zh-CN/2.6.4/io-debezium-source">中文</a></li><li><a href="/docs/zh-TW/2.6.4/io-debezium-source">繁體中文</a></li><li><a href="https://crowdin.com/project/apache-pulsar" target="_blank" rel="noreferrer noopener">Help Translate</a></li></ul></div></li><script>
const languagesMenuItem = document.getElementById("languages-menu");
const languagesDropDown = document.getElementById("languages-dropdown");
languagesMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (languagesDropDown.className == "hide") {
languagesDropDown.className = "visible";
} else {
languagesDropDown.className = "hide";
}
});
</script></span></ul></nav></div></header></div></div><div class="navPusher"><div class="docMainWrapper wrapper"><div class="container mainContainer docsContainer"><div class="wrapper"><div class="post"><header class="postHeader"><a class="edit-page-link button" href="https://github.com/apache/pulsar/edit/master/site2/docs/io-debezium-source.md" target="_blank" rel="noreferrer noopener">Edit</a><h1 id="__docusaurus" class="postHeaderTitle">Debezium source connector</h1></header><article><div><span><p>The Debezium source connector pulls messages from MySQL or PostgreSQL
and persists the messages to Pulsar topics.</p>
<h2><a class="anchor" aria-hidden="true" id="configuration"></a><a href="#configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuration</h2>
<p>The configuration of Debezium source connector has the following properties.</p>
<table>
<thead>
<tr><th>Name</th><th>Required</th><th>Default</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>task.class</code></td><td>true</td><td>null</td><td>A source task class that implemented in Debezium.</td></tr>
<tr><td><code>database.hostname</code></td><td>true</td><td>null</td><td>The address of a database server.</td></tr>
<tr><td><code>database.port</code></td><td>true</td><td>null</td><td>The port number of a database server.</td></tr>
<tr><td><code>database.user</code></td><td>true</td><td>null</td><td>The name of a database user that has the required privileges.</td></tr>
<tr><td><code>database.password</code></td><td>true</td><td>null</td><td>The password for a database user that has the required privileges.</td></tr>
<tr><td><code>database.server.id</code></td><td>true</td><td>null</td><td>The connector’s identifier that must be unique within a database cluster and similar to the database’s server-id configuration property.</td></tr>
<tr><td><code>database.server.name</code></td><td>true</td><td>null</td><td>The logical name of a database server/cluster, which forms a namespace and it is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.</td></tr>
<tr><td><code>database.whitelist</code></td><td>false</td><td>null</td><td>A list of all databases hosted by this server which is monitored by the connector.<br/><br/> This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring.</td></tr>
<tr><td><code>key.converter</code></td><td>true</td><td>null</td><td>The converter provided by Kafka Connect to convert record key.</td></tr>
<tr><td><code>value.converter</code></td><td>true</td><td>null</td><td>The converter provided by Kafka Connect to convert record value.</td></tr>
<tr><td><code>database.history</code></td><td>true</td><td>null</td><td>The name of the database history class.</td></tr>
<tr><td><code>database.history.pulsar.topic</code></td><td>true</td><td>null</td><td>The name of the database history topic where the connector writes and recovers DDL statements. <br/><br/><strong>Note: this topic is for internal use only and should not be used by consumers.</strong></td></tr>
<tr><td><code>database.history.pulsar.service.url</code></td><td>true</td><td>null</td><td>Pulsar cluster service URL for history topic.</td></tr>
<tr><td><code>pulsar.service.url</code></td><td>true</td><td>null</td><td>Pulsar cluster service URL.</td></tr>
<tr><td><code>offset.storage.topic</code></td><td>true</td><td>null</td><td>Record the last committed offsets that the connector successfully completes.</td></tr>
<tr><td><code>json-with-envelope</code></td><td>false</td><td>false</td><td>Present the message only consist of payload.</td></tr>
</tbody>
</table>
<h3><a class="anchor" aria-hidden="true" id="converter-options"></a><a href="#converter-options" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Converter Options</h3>
<ol>
<li>org.apache.kafka.connect.json.JsonConverter</li>
</ol>
<p>This config <code>json-with-envelope</code> is valid only for the JsonConverter. It's default value is false, the consumer use the schema <code>Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)</code>,
and the message only consist of payload.</p>
<p>If the config <code>json-with-envelope</code> value is true, the consumer use the schema
<code>Schema.KeyValue(Schema.BYTES, Schema.BYTES</code>, the message consist of schema and payload.</p>
<ol start="2">
<li>org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter</li>
</ol>
<p>If users select the AvroConverter, then the pulsar consumer should use the schema <code>Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED)</code>, and the message consist of payload.</p>
<h3><a class="anchor" aria-hidden="true" id="mongodb-configuration"></a><a href="#mongodb-configuration" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>MongoDB Configuration</h3>
<table>
<thead>
<tr><th>Name</th><th>Required</th><th>Default</th><th>Description</th></tr>
</thead>
<tbody>
<tr><td><code>mongodb.hosts</code></td><td>true</td><td>null</td><td>The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list contains a single hostname and a port pair. If mongodb.members.auto.discover is set to false, the host and port pair are prefixed with the replica set name (e.g., rs0/localhost:27017).</td></tr>
<tr><td><code>mongodb.name</code></td><td>true</td><td>null</td><td>A unique name that identifies the connector and/or MongoDB replica set or shared cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster.</td></tr>
<tr><td><code>mongodb.user</code></td><td>true</td><td>null</td><td>Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.</td></tr>
<tr><td><code>mongodb.password</code></td><td>true</td><td>null</td><td>Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.</td></tr>
<tr><td><code>mongodb.task.id</code></td><td>true</td><td>null</td><td>The taskId of the MongoDB connector that attempts to use a separate task for each replica set.</td></tr>
</tbody>
</table>
<h2><a class="anchor" aria-hidden="true" id="example-of-mysql"></a><a href="#example-of-mysql" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example of MySQL</h2>
<p>You need to create a configuration file before using the Pulsar Debezium connector.</p>
<h3><a class="anchor" aria-hidden="true" id="configuration-1"></a><a href="#configuration-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuration</h3>
<p>You can use one of the following methods to create a configuration file.</p>
<ul>
<li><p>JSON</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"database.hostname"</span>: <span class="hljs-string">"localhost"</span>,
<span class="hljs-attr">"database.port"</span>: <span class="hljs-string">"3306"</span>,
<span class="hljs-attr">"database.user"</span>: <span class="hljs-string">"debezium"</span>,
<span class="hljs-attr">"database.password"</span>: <span class="hljs-string">"dbz"</span>,
<span class="hljs-attr">"database.server.id"</span>: <span class="hljs-string">"184054"</span>,
<span class="hljs-attr">"database.server.name"</span>: <span class="hljs-string">"dbserver1"</span>,
<span class="hljs-attr">"database.whitelist"</span>: <span class="hljs-string">"inventory"</span>,
<span class="hljs-attr">"database.history"</span>: <span class="hljs-string">"org.apache.pulsar.io.debezium.PulsarDatabaseHistory"</span>,
<span class="hljs-attr">"database.history.pulsar.topic"</span>: <span class="hljs-string">"history-topic"</span>,
<span class="hljs-attr">"database.history.pulsar.service.url"</span>: <span class="hljs-string">"pulsar://127.0.0.1:6650"</span>,
<span class="hljs-attr">"key.converter"</span>: <span class="hljs-string">"org.apache.kafka.connect.json.JsonConverter"</span>,
<span class="hljs-attr">"value.converter"</span>: <span class="hljs-string">"org.apache.kafka.connect.json.JsonConverter"</span>,
<span class="hljs-attr">"pulsar.service.url"</span>: <span class="hljs-string">"pulsar://127.0.0.1:6650"</span>,
<span class="hljs-attr">"offset.storage.topic"</span>: <span class="hljs-string">"offset-topic"</span>
}
</code></pre></li>
<li><p>YAML</p>
<p>You can create a <code>debezium-mysql-source-config.yaml</code> file and copy the <a href="https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml">contents</a> below to the <code>debezium-mysql-source-config.yaml</code> file.</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">tenant:</span> <span class="hljs-string">"public"</span>
<span class="hljs-attr">namespace:</span> <span class="hljs-string">"default"</span>
<span class="hljs-attr">name:</span> <span class="hljs-string">"debezium-mysql-source"</span>
<span class="hljs-attr">topicName:</span> <span class="hljs-string">"debezium-mysql-topic"</span>
<span class="hljs-attr">archive:</span> <span class="hljs-string">"connectors/pulsar-io-debezium-mysql-<span class="hljs-template-variable">2.6.4</span>.nar"</span>
<span class="hljs-attr">parallelism:</span> <span class="hljs-number">1</span>
<span class="hljs-attr">configs:</span>
<span class="hljs-comment">## config for mysql, docker image: debezium/example-mysql:0.8</span>
<span class="hljs-attr">database.hostname:</span> <span class="hljs-string">"localhost"</span>
<span class="hljs-attr">database.port:</span> <span class="hljs-string">"3306"</span>
<span class="hljs-attr">database.user:</span> <span class="hljs-string">"debezium"</span>
<span class="hljs-attr">database.password:</span> <span class="hljs-string">"dbz"</span>
<span class="hljs-attr">database.server.id:</span> <span class="hljs-string">"184054"</span>
<span class="hljs-attr">database.server.name:</span> <span class="hljs-string">"dbserver1"</span>
<span class="hljs-attr">database.whitelist:</span> <span class="hljs-string">"inventory"</span>
<span class="hljs-attr">database.history:</span> <span class="hljs-string">"org.apache.pulsar.io.debezium.PulsarDatabaseHistory"</span>
<span class="hljs-attr">database.history.pulsar.topic:</span> <span class="hljs-string">"history-topic"</span>
<span class="hljs-attr">database.history.pulsar.service.url:</span> <span class="hljs-string">"pulsar://127.0.0.1:6650"</span>
<span class="hljs-comment">## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG</span>
<span class="hljs-attr">key.converter:</span> <span class="hljs-string">"org.apache.kafka.connect.json.JsonConverter"</span>
<span class="hljs-attr">value.converter:</span> <span class="hljs-string">"org.apache.kafka.connect.json.JsonConverter"</span>
<span class="hljs-comment">## PULSAR_SERVICE_URL_CONFIG</span>
<span class="hljs-attr">pulsar.service.url:</span> <span class="hljs-string">"pulsar://127.0.0.1:6650"</span>
<span class="hljs-comment">## OFFSET_STORAGE_TOPIC_CONFIG</span>
<span class="hljs-attr">offset.storage.topic:</span> <span class="hljs-string">"offset-topic"</span>
</code></pre></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="usage"></a><a href="#usage" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Usage</h3>
<p>This example shows how to change the data of a MySQL table using the Pulsar Debezium connector.</p>
<ol>
<li><p>Start a MySQL server with a database from which Debezium can capture changes.</p>
<pre><code class="hljs css language-bash">$ docker run -it --rm \
--name mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8
</code></pre></li>
<li><p>Start a Pulsar service locally in standalone mode.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar standalone
</code></pre></li>
<li><p>Start the Pulsar Debezium connector in local run mode using one of the following methods.</p>
<ul>
<li><p>Use the <strong>JSON</strong> configuration file as shown previously.</p>
<p>Make sure the nar file is available at <code>connectors/pulsar-io-debezium-mysql-2.6.4.nar</code>.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">source</span> localrun \
--archive connectors/pulsar-io-debezium-mysql-2.6.4.nar \
--name debezium-mysql-source --destination-topic-name debezium-mysql-topic \
--tenant public \
--namespace default \
--<span class="hljs-built_in">source</span>-config <span class="hljs-string">'{"database.hostname": "localhost","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","database.server.name": "dbserver1","database.whitelist": "inventory","database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic": "history-topic","database.history.pulsar.service.url": "pulsar://127.0.0.1:6650","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","pulsar.service.url": "pulsar://127.0.0.1:6650","offset.storage.topic": "offset-topic"}'</span>
</code></pre></li>
<li><p>Use the <strong>YAML</strong> configuration file as shown previously.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">source</span> localrun \
--<span class="hljs-built_in">source</span>-config-file debezium-mysql-source-config.yaml
</code></pre></li>
</ul></li>
<li><p>Subscribe the topic <em>sub-products</em> for the table <em>inventory.products</em>.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-client consume -s <span class="hljs-string">"sub-products"</span> public/default/dbserver1.inventory.products -n 0
</code></pre></li>
<li><p>Start a MySQL client in docker.</p>
<pre><code class="hljs css language-bash">$ docker run -it --rm \
--name mysqlterm \
--link mysql \
--rm mysql:5.7 sh \
-c <span class="hljs-string">'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'</span>
</code></pre></li>
<li><p>A MySQL client pops out.</p>
<p>Use the following commands to change the data of the table <em>products</em>.</p>
<pre><code class="hljs">mysql&gt; use inventory;
mysql&gt; <span class="hljs-keyword">show</span> <span class="hljs-keyword">tables</span>;
mysql&gt; <span class="hljs-keyword">SELECT</span> * <span class="hljs-keyword">FROM</span> products;
mysql&gt; <span class="hljs-keyword">UPDATE</span> products <span class="hljs-keyword">SET</span> <span class="hljs-type">name</span>=<span class="hljs-string">'1111111111'</span> <span class="hljs-keyword">WHERE</span> id=<span class="hljs-number">101</span>;
mysql&gt; <span class="hljs-keyword">UPDATE</span> products <span class="hljs-keyword">SET</span> <span class="hljs-type">name</span>=<span class="hljs-string">'1111111111'</span> <span class="hljs-keyword">WHERE</span> id=<span class="hljs-number">107</span>;
</code></pre>
<p>In the terminal window of subscribing topic, you can find the data changes have been kept in the <em>sub-products</em> topic.</p></li>
</ol>
<h2><a class="anchor" aria-hidden="true" id="example-of-postgresql"></a><a href="#example-of-postgresql" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example of PostgreSQL</h2>
<p>You need to create a configuration file before using the Pulsar Debezium connector.</p>
<h3><a class="anchor" aria-hidden="true" id="configuration-2"></a><a href="#configuration-2" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Configuration</h3>
<p>You can use one of the following methods to create a configuration file.</p>
<ul>
<li><p>JSON</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"database.hostname"</span>: <span class="hljs-string">"localhost"</span>,
<span class="hljs-attr">"database.port"</span>: <span class="hljs-string">"5432"</span>,
<span class="hljs-attr">"database.user"</span>: <span class="hljs-string">"postgres"</span>,
<span class="hljs-attr">"database.password"</span>: <span class="hljs-string">"postgres"</span>,
<span class="hljs-attr">"database.dbname"</span>: <span class="hljs-string">"postgres"</span>,
<span class="hljs-attr">"database.server.name"</span>: <span class="hljs-string">"dbserver1"</span>,
<span class="hljs-attr">"schema.whitelist"</span>: <span class="hljs-string">"inventory"</span>,
<span class="hljs-attr">"pulsar.service.url"</span>: <span class="hljs-string">"pulsar://127.0.0.1:6650"</span>
}
</code></pre></li>
<li><p>YAML</p>
<p>You can create a <code>debezium-postgres-source-config.yaml</code> file and copy the <a href="https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml">contents</a> below to the <code>debezium-postgres-source-config.yaml</code> file.</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">tenant:</span> <span class="hljs-string">"public"</span>
<span class="hljs-attr">namespace:</span> <span class="hljs-string">"default"</span>
<span class="hljs-attr">name:</span> <span class="hljs-string">"debezium-postgres-source"</span>
<span class="hljs-attr">topicName:</span> <span class="hljs-string">"debezium-postgres-topic"</span>
<span class="hljs-attr">archive:</span> <span class="hljs-string">"connectors/pulsar-io-debezium-postgres-<span class="hljs-template-variable">2.6.4</span>.nar"</span>
<span class="hljs-attr">parallelism:</span> <span class="hljs-number">1</span>
<span class="hljs-attr">configs:</span>
<span class="hljs-comment">## config for pg, docker image: debezium/example-postgress:0.8</span>
<span class="hljs-attr">database.hostname:</span> <span class="hljs-string">"localhost"</span>
<span class="hljs-attr">database.port:</span> <span class="hljs-string">"5432"</span>
<span class="hljs-attr">database.user:</span> <span class="hljs-string">"postgres"</span>
<span class="hljs-attr">database.password:</span> <span class="hljs-string">"postgres"</span>
<span class="hljs-attr">database.dbname:</span> <span class="hljs-string">"postgres"</span>
<span class="hljs-attr">database.server.name:</span> <span class="hljs-string">"dbserver1"</span>
<span class="hljs-attr">schema.whitelist:</span> <span class="hljs-string">"inventory"</span>
<span class="hljs-comment">## PULSAR_SERVICE_URL_CONFIG</span>
<span class="hljs-attr">pulsar.service.url:</span> <span class="hljs-string">"pulsar://127.0.0.1:6650"</span>
</code></pre></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="usage-1"></a><a href="#usage-1" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Usage</h3>
<p>This example shows how to change the data of a PostgreSQL table using the Pulsar Debezium connector.</p>
<ol>
<li><p>Start a PostgreSQL server with a database from which Debezium can capture changes.</p>
<pre><code class="hljs css language-bash">$ docker pull debezium/example-postgres:0.8
$ docker run -d -it --rm --name pulsar-postgresql -p 5432:5432 debezium/example-postgres:0.8
</code></pre></li>
<li><p>Start a Pulsar service locally in standalone mode.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar standalone
</code></pre></li>
<li><p>Start the Pulsar Debezium connector in local run mode using one of the following methods.</p>
<ul>
<li><p>Use the <strong>JSON</strong> configuration file as shown previously.</p>
<p>Make sure the nar file is available at <code>connectors/pulsar-io-debezium-postgres-2.6.4.nar</code>.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">source</span> localrun \
--archive connectors/pulsar-io-debezium-postgres-2.6.4.nar \
--name debezium-postgres-source \
--destination-topic-name debezium-postgres-topic \
--tenant public \
--namespace default \
--<span class="hljs-built_in">source</span>-config <span class="hljs-string">'{"database.hostname": "localhost","database.port": "5432","database.user": "postgres","database.password": "postgres","database.dbname": "postgres","database.server.name": "dbserver1","schema.whitelist": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'</span>
</code></pre></li>
<li><p>Use the <strong>YAML</strong> configuration file as shown previously.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">source</span> localrun \
--<span class="hljs-built_in">source</span>-config-file debezium-postgres-source-config.yaml
</code></pre></li>
</ul></li>
<li><p>Subscribe the topic <em>sub-products</em> for the <em>inventory.products</em> table.</p>
<pre><code class="hljs">$ bin/pulsar-<span class="hljs-keyword">client</span> consume -s <span class="hljs-string">"sub-products"</span> <span class="hljs-keyword">public</span>/<span class="hljs-keyword">default</span>/dbserver1.inventory.products -n <span class="hljs-number">0</span>
</code></pre></li>
<li><p>Start a PostgreSQL client in docker.</p>
<pre><code class="hljs css language-bash">$ docker <span class="hljs-built_in">exec</span> -it pulsar-postgresql /bin/bash
</code></pre></li>
<li><p>A PostgreSQL client pops out.</p>
<p>Use the following commands to change the data of the table <em>products</em>.</p>
<pre><code class="hljs">psql -U postgres postgres
postgres=<span class="hljs-comment"># \c postgres;</span>
You are now connected to database <span class="hljs-string">"postgres"</span> as user <span class="hljs-string">"postgres"</span>.
postgres=<span class="hljs-comment"># SET search_path TO inventory;</span>
SET
postgres=<span class="hljs-comment"># select * from products;</span>
id |<span class="hljs-string"> name </span>|<span class="hljs-string"> description </span>|<span class="hljs-string"> weight
-----+--------------------+---------------------------------------------------------+--------
102 </span>|<span class="hljs-string"> car battery </span>|<span class="hljs-string"> 12V car battery </span>|<span class="hljs-string"> 8.1
103 </span>|<span class="hljs-string"> 12-pack drill bits </span>|<span class="hljs-string"> 12-pack of drill bits with sizes ranging from #40 to #3 </span>|<span class="hljs-string"> 0.8
104 </span>|<span class="hljs-string"> hammer </span>|<span class="hljs-string"> 12oz carpenter's hammer </span>|<span class="hljs-string"> 0.75
105 </span>|<span class="hljs-string"> hammer </span>|<span class="hljs-string"> 14oz carpenter's hammer </span>|<span class="hljs-string"> 0.875
106 </span>|<span class="hljs-string"> hammer </span>|<span class="hljs-string"> 16oz carpenter's hammer </span>|<span class="hljs-string"> 1
107 </span>|<span class="hljs-string"> rocks </span>|<span class="hljs-string"> box of assorted rocks </span>|<span class="hljs-string"> 5.3
108 </span>|<span class="hljs-string"> jacket </span>|<span class="hljs-string"> water resistent black wind breaker </span>|<span class="hljs-string"> 0.1
109 </span>|<span class="hljs-string"> spare tire </span>|<span class="hljs-string"> 24 inch spare tire </span>|<span class="hljs-string"> 22.2
101 </span>|<span class="hljs-string"> 1111111111 </span>|<span class="hljs-string"> Small 2-wheel scooter </span>|<span class="hljs-string"> 3.14
(9 rows)
postgres=# UPDATE products SET name='1111111111' WHERE id=107;
UPDATE 1
</span></code></pre>
<p>In the terminal window of subscribing topic, you can receive the following messages.</p>
<pre><code class="hljs css language-bash">----- got message -----
{<span class="hljs-string">"schema"</span>:{<span class="hljs-string">"type"</span>:<span class="hljs-string">"struct"</span>,<span class="hljs-string">"fields"</span>:[{<span class="hljs-string">"type"</span>:<span class="hljs-string">"int32"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"id"</span>}],<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"dbserver1.inventory.products.Key"</span>},<span class="hljs-string">"payload"</span>:{<span class="hljs-string">"id"</span>:107}}�{<span class="hljs-string">"schema"</span>:{<span class="hljs-string">"type"</span>:<span class="hljs-string">"struct"</span>,<span class="hljs-string">"fields"</span>:[{<span class="hljs-string">"type"</span>:<span class="hljs-string">"struct"</span>,<span class="hljs-string">"fields"</span>:[{<span class="hljs-string">"type"</span>:<span class="hljs-string">"int32"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"id"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"name"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"description"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"double"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"weight"</span>}],<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"dbserver1.inventory.products.Value"</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"before"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"struct"</span>,<span class="hljs-string">"fields"</span>:[{<span class="hljs-string">"type"</span>:<span class="hljs-string">"int32"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"id"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"name"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"description"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"double"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"weight"</span>}],<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"dbserver1.inventory.products.Value"</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"after"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"struct"</span>,<span class="hljs-string">"fields"</span>:[{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"version"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"connector"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"name"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"db"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"int64"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"ts_usec"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"int64"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"txId"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"int64"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"lsn"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"schema"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"table"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"boolean"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"default"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"snapshot"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"boolean"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"last_snapshot_record"</span>}],<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"io.debezium.connector.postgresql.Source"</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"source"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"op"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"int64"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"ts_ms"</span>}],<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"dbserver1.inventory.products.Envelope"</span>},<span class="hljs-string">"payload"</span>:{<span class="hljs-string">"before"</span>:{<span class="hljs-string">"id"</span>:107,<span class="hljs-string">"name"</span>:<span class="hljs-string">"rocks"</span>,<span class="hljs-string">"description"</span>:<span class="hljs-string">"box of assorted rocks"</span>,<span class="hljs-string">"weight"</span>:5.3},<span class="hljs-string">"after"</span>:{<span class="hljs-string">"id"</span>:107,<span class="hljs-string">"name"</span>:<span class="hljs-string">"1111111111"</span>,<span class="hljs-string">"description"</span>:<span class="hljs-string">"box of assorted rocks"</span>,<span class="hljs-string">"weight"</span>:5.3},<span class="hljs-string">"source"</span>:{<span class="hljs-string">"version"</span>:<span class="hljs-string">"0.9.2.Final"</span>,<span class="hljs-string">"connector"</span>:<span class="hljs-string">"postgresql"</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"dbserver1"</span>,<span class="hljs-string">"db"</span>:<span class="hljs-string">"postgres"</span>,<span class="hljs-string">"ts_usec"</span>:1559208957661080,<span class="hljs-string">"txId"</span>:577,<span class="hljs-string">"lsn"</span>:23862872,<span class="hljs-string">"schema"</span>:<span class="hljs-string">"inventory"</span>,<span class="hljs-string">"table"</span>:<span class="hljs-string">"products"</span>,<span class="hljs-string">"snapshot"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"last_snapshot_record"</span>:null},<span class="hljs-string">"op"</span>:<span class="hljs-string">"u"</span>,<span class="hljs-string">"ts_ms"</span>:1559208957692}}
</code></pre></li>
</ol>
<h2><a class="anchor" aria-hidden="true" id="example-of-mongodb"></a><a href="#example-of-mongodb" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Example of MongoDB</h2>
<p>You need to create a configuration file before using the Pulsar Debezium connector.</p>
<ul>
<li><p>JSON</p>
<pre><code class="hljs css language-json">{
<span class="hljs-attr">"mongodb.hosts"</span>: <span class="hljs-string">"rs0/mongodb:27017"</span>,
<span class="hljs-attr">"mongodb.name"</span>: <span class="hljs-string">"dbserver1"</span>,
<span class="hljs-attr">"mongodb.user"</span>: <span class="hljs-string">"debezium"</span>,
<span class="hljs-attr">"mongodb.password"</span>: <span class="hljs-string">"dbz"</span>,
<span class="hljs-attr">"mongodb.task.id"</span>: <span class="hljs-string">"1"</span>,
<span class="hljs-attr">"database.whitelist"</span>: <span class="hljs-string">"inventory"</span>,
<span class="hljs-attr">"pulsar.service.url"</span>: <span class="hljs-string">"pulsar://127.0.0.1:6650"</span>
}
</code></pre></li>
<li><p>YAML</p>
<p>You can create a <code>debezium-mongodb-source-config.yaml</code> file and copy the <a href="https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml">contents</a> below to the <code>debezium-mongodb-source-config.yaml</code> file.</p>
<pre><code class="hljs css language-yaml"><span class="hljs-attr">tenant:</span> <span class="hljs-string">"public"</span>
<span class="hljs-attr">namespace:</span> <span class="hljs-string">"default"</span>
<span class="hljs-attr">name:</span> <span class="hljs-string">"debezium-mongodb-source"</span>
<span class="hljs-attr">topicName:</span> <span class="hljs-string">"debezium-mongodb-topic"</span>
<span class="hljs-attr">archive:</span> <span class="hljs-string">"connectors/pulsar-io-debezium-mongodb-<span class="hljs-template-variable">2.6.4</span>.nar"</span>
<span class="hljs-attr">parallelism:</span> <span class="hljs-number">1</span>
<span class="hljs-attr">configs:</span>
<span class="hljs-comment">## config for pg, docker image: debezium/example-mongodb:0.10</span>
<span class="hljs-attr">mongodb.hosts:</span> <span class="hljs-string">"rs0/mongodb:27017"</span><span class="hljs-string">,</span>
<span class="hljs-attr">mongodb.name:</span> <span class="hljs-string">"dbserver1"</span><span class="hljs-string">,</span>
<span class="hljs-attr">mongodb.user:</span> <span class="hljs-string">"debezium"</span><span class="hljs-string">,</span>
<span class="hljs-attr">mongodb.password:</span> <span class="hljs-string">"dbz"</span><span class="hljs-string">,</span>
<span class="hljs-attr">mongodb.task.id:</span> <span class="hljs-string">"1"</span><span class="hljs-string">,</span>
<span class="hljs-attr">database.whitelist:</span> <span class="hljs-string">"inventory"</span><span class="hljs-string">,</span>
<span class="hljs-comment">## PULSAR_SERVICE_URL_CONFIG</span>
<span class="hljs-attr">pulsar.service.url:</span> <span class="hljs-string">"pulsar://127.0.0.1:6650"</span>
</code></pre></li>
</ul>
<h3><a class="anchor" aria-hidden="true" id="usage-2"></a><a href="#usage-2" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Usage</h3>
<p>This example shows how to change the data of a MongoDB table using the Pulsar Debezium connector.</p>
<ol>
<li><p>Start a MongoDB server with a database from which Debezium can capture changes.</p>
<pre><code class="hljs css language-bash">$ docker pull debezium/example-mongodb:0.10
$ docker run -d -it --rm --name pulsar-mongodb -e MONGODB_USER=mongodb -e MONGODB_PASSWORD=mongodb -p 27017:27017 debezium/example-mongodb:0.10
</code></pre>
<p>Use the following commands to initialize the data.</p>
<pre><code class="hljs css language-bash">./usr/<span class="hljs-built_in">local</span>/bin/init-inventory.sh
</code></pre>
<p>If the local host cannot access the container network, you can update the file <code>/etc/hosts</code> and add a rule <code>127.0.0.1 6 f114527a95f</code>. f114527a95f is container id, you can try to get by <code>docker ps -a</code></p></li>
</ol>
<ol start="2">
<li><p>Start a Pulsar service locally in standalone mode.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar standalone
</code></pre></li>
<li><p>Start the Pulsar Debezium connector in local run mode using one of the following methods.</p>
<ul>
<li><p>Use the <strong>JSON</strong> configuration file as shown previously.</p>
<p>Make sure the nar file is available at <code>connectors/pulsar-io-mongodb-2.6.4.nar</code>.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">source</span> localrun \
--archive connectors/pulsar-io-debezium-mongodb-2.6.4.nar \
--name debezium-mongodb-source \
--destination-topic-name debezium-mongodb-topic \
--tenant public \
--namespace default \
--<span class="hljs-built_in">source</span>-config <span class="hljs-string">'{"mongodb.hosts": "rs0/mongodb:27017","mongodb.name": "dbserver1","mongodb.user": "debezium","mongodb.password": "dbz","mongodb.task.id": "1","database.whitelist": "inventory","pulsar.service.url": "pulsar://127.0.0.1:6650"}'</span>
</code></pre></li>
<li><p>Use the <strong>YAML</strong> configuration file as shown previously.</p>
<pre><code class="hljs css language-bash">$ bin/pulsar-admin <span class="hljs-built_in">source</span> localrun \
--<span class="hljs-built_in">source</span>-config-file debezium-mongodb-source-config.yaml
</code></pre></li>
</ul></li>
<li><p>Subscribe the topic <em>sub-products</em> for the <em>inventory.products</em> table.</p>
<pre><code class="hljs">$ bin/pulsar-<span class="hljs-keyword">client</span> consume -s <span class="hljs-string">"sub-products"</span> <span class="hljs-keyword">public</span>/<span class="hljs-keyword">default</span>/dbserver1.inventory.products -n <span class="hljs-number">0</span>
</code></pre></li>
<li><p>Start a MongoDB client in docker.</p>
<pre><code class="hljs css language-bash">$ docker <span class="hljs-built_in">exec</span> -it pulsar-mongodb /bin/bash
</code></pre></li>
<li><p>A MongoDB client pops out.</p>
<pre><code class="hljs css language-bash">mongo -u debezium -p dbz --authenticationDatabase admin localhost:27017/inventory
db.products.update({<span class="hljs-string">"_id"</span>:NumberLong(104)},{<span class="hljs-variable">$set</span>:{weight:1.25}})
</code></pre>
<p>In the terminal window of subscribing topic, you can receive the following messages.</p>
<pre><code class="hljs css language-bash">----- got message -----
{<span class="hljs-string">"schema"</span>:{<span class="hljs-string">"type"</span>:<span class="hljs-string">"struct"</span>,<span class="hljs-string">"fields"</span>:[{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"id"</span>}],<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"dbserver1.inventory.products.Key"</span>},<span class="hljs-string">"payload"</span>:{<span class="hljs-string">"id"</span>:<span class="hljs-string">"104"</span>}}, value = {<span class="hljs-string">"schema"</span>:{<span class="hljs-string">"type"</span>:<span class="hljs-string">"struct"</span>,<span class="hljs-string">"fields"</span>:[{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"io.debezium.data.Json"</span>,<span class="hljs-string">"version"</span>:1,<span class="hljs-string">"field"</span>:<span class="hljs-string">"after"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"io.debezium.data.Json"</span>,<span class="hljs-string">"version"</span>:1,<span class="hljs-string">"field"</span>:<span class="hljs-string">"patch"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"struct"</span>,<span class="hljs-string">"fields"</span>:[{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"version"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"connector"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"name"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"int64"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"ts_ms"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"io.debezium.data.Enum"</span>,<span class="hljs-string">"version"</span>:1,<span class="hljs-string">"parameters"</span>:{<span class="hljs-string">"allowed"</span>:<span class="hljs-string">"true,last,false"</span>},<span class="hljs-string">"default"</span>:<span class="hljs-string">"false"</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"snapshot"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"db"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"rs"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"collection"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"int32"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"ord"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"int64"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"h"</span>}],<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"io.debezium.connector.mongo.Source"</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"source"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"string"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"op"</span>},{<span class="hljs-string">"type"</span>:<span class="hljs-string">"int64"</span>,<span class="hljs-string">"optional"</span>:<span class="hljs-literal">true</span>,<span class="hljs-string">"field"</span>:<span class="hljs-string">"ts_ms"</span>}],<span class="hljs-string">"optional"</span>:<span class="hljs-literal">false</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"dbserver1.inventory.products.Envelope"</span>},<span class="hljs-string">"payload"</span>:{<span class="hljs-string">"after"</span>:<span class="hljs-string">"{\"_id\": {\"<span class="hljs-variable">$numberLong</span>\": \"104\"},\"name\": \"hammer\",\"description\": \"12oz carpenter's hammer\",\"weight\": 1.25,\"quantity\": 4}"</span>,<span class="hljs-string">"patch"</span>:null,<span class="hljs-string">"source"</span>:{<span class="hljs-string">"version"</span>:<span class="hljs-string">"0.10.0.Final"</span>,<span class="hljs-string">"connector"</span>:<span class="hljs-string">"mongodb"</span>,<span class="hljs-string">"name"</span>:<span class="hljs-string">"dbserver1"</span>,<span class="hljs-string">"ts_ms"</span>:1573541905000,<span class="hljs-string">"snapshot"</span>:<span class="hljs-string">"true"</span>,<span class="hljs-string">"db"</span>:<span class="hljs-string">"inventory"</span>,<span class="hljs-string">"rs"</span>:<span class="hljs-string">"rs0"</span>,<span class="hljs-string">"collection"</span>:<span class="hljs-string">"products"</span>,<span class="hljs-string">"ord"</span>:1,<span class="hljs-string">"h"</span>:4983083486544392763},<span class="hljs-string">"op"</span>:<span class="hljs-string">"r"</span>,<span class="hljs-string">"ts_ms"</span>:1573541909761}}.
</code></pre></li>
</ol>
<h2><a class="anchor" aria-hidden="true" id="faq"></a><a href="#faq" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>FAQ</h2>
<h3><a class="anchor" aria-hidden="true" id="debezium-postgres-connector-will-hang-when-create-snap"></a><a href="#debezium-postgres-connector-will-hang-when-create-snap" aria-hidden="true" class="hash-link"><svg class="hash-link-icon" aria-hidden="true" height="16" version="1.1" viewBox="0 0 16 16" width="16"><path fill-rule="evenodd" d="M4 9h1v1H4c-1.5 0-3-1.69-3-3.5S2.55 3 4 3h4c1.45 0 3 1.69 3 3.5 0 1.41-.91 2.72-2 3.25V8.59c.58-.45 1-1.27 1-2.09C10 5.22 8.98 4 8 4H4c-.98 0-2 1.22-2 2.5S3 9 4 9zm9-3h-1v1h1c1 0 2 1.22 2 2.5S13.98 12 13 12H9c-.98 0-2-1.22-2-2.5 0-.83.42-1.64 1-2.09V6.25c-1.09.53-2 1.84-2 3.25C6 11.31 7.55 13 9 13h4c1.45 0 3-1.69 3-3.5S14.5 6 13 6z"></path></svg></a>Debezium postgres connector will hang when create snap</h3>
<pre><code class="hljs css language-$xslt">#<span class="hljs-number">18</span> prio=<span class="hljs-number">5</span> os_prio=<span class="hljs-number">31</span> tid=<span class="hljs-number">0</span>x00007fd83096f800 nid=<span class="hljs-number">0</span>xa403 waiting on condition [<span class="hljs-number">0</span>x000070000f534000]
java<span class="hljs-selector-class">.lang</span><span class="hljs-selector-class">.Thread</span><span class="hljs-selector-class">.State</span>: WAITING (parking)
at sun<span class="hljs-selector-class">.misc</span><span class="hljs-selector-class">.Unsafe</span>.park(Native Method)
- parking to wait <span class="hljs-keyword">for</span> &lt;<span class="hljs-number">0</span>x00000007ab025a58&gt; (<span class="hljs-selector-tag">a</span> java<span class="hljs-selector-class">.util</span><span class="hljs-selector-class">.concurrent</span><span class="hljs-selector-class">.locks</span>.AbstractQueuedSynchronizer<span class="hljs-variable">$ConditionObject</span>)
at java<span class="hljs-selector-class">.util</span><span class="hljs-selector-class">.concurrent</span><span class="hljs-selector-class">.locks</span><span class="hljs-selector-class">.LockSupport</span>.park(LockSupport<span class="hljs-selector-class">.java</span>:<span class="hljs-number">175</span>)
at java<span class="hljs-selector-class">.util</span><span class="hljs-selector-class">.concurrent</span><span class="hljs-selector-class">.locks</span>.AbstractQueuedSynchronizer<span class="hljs-variable">$ConditionObject</span>.await(AbstractQueuedSynchronizer<span class="hljs-selector-class">.java</span>:<span class="hljs-number">2039</span>)
at java<span class="hljs-selector-class">.util</span><span class="hljs-selector-class">.concurrent</span><span class="hljs-selector-class">.LinkedBlockingDeque</span>.putLast(LinkedBlockingDeque<span class="hljs-selector-class">.java</span>:<span class="hljs-number">396</span>)
at java<span class="hljs-selector-class">.util</span><span class="hljs-selector-class">.concurrent</span><span class="hljs-selector-class">.LinkedBlockingDeque</span>.put(LinkedBlockingDeque<span class="hljs-selector-class">.java</span>:<span class="hljs-number">649</span>)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.connector</span><span class="hljs-selector-class">.base</span><span class="hljs-selector-class">.ChangeEventQueue</span>.enqueue(ChangeEventQueue<span class="hljs-selector-class">.java</span>:<span class="hljs-number">132</span>)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.connector</span><span class="hljs-selector-class">.postgresql</span>.PostgresConnectorTask$<span class="hljs-variable">$Lambda</span>$<span class="hljs-number">203</span>/<span class="hljs-number">385424085</span>.accept(Unknown Source)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.connector</span><span class="hljs-selector-class">.postgresql</span><span class="hljs-selector-class">.RecordsSnapshotProducer</span>.sendCurrentRecord(RecordsSnapshotProducer<span class="hljs-selector-class">.java</span>:<span class="hljs-number">402</span>)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.connector</span><span class="hljs-selector-class">.postgresql</span><span class="hljs-selector-class">.RecordsSnapshotProducer</span>.readTable(RecordsSnapshotProducer<span class="hljs-selector-class">.java</span>:<span class="hljs-number">321</span>)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.connector</span><span class="hljs-selector-class">.postgresql</span><span class="hljs-selector-class">.RecordsSnapshotProducer</span>.lambda<span class="hljs-variable">$takeSnapshot</span>$<span class="hljs-number">6</span>(RecordsSnapshotProducer<span class="hljs-selector-class">.java</span>:<span class="hljs-number">226</span>)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.connector</span><span class="hljs-selector-class">.postgresql</span>.RecordsSnapshotProducer$<span class="hljs-variable">$Lambda</span>$<span class="hljs-number">240</span>/<span class="hljs-number">1347039967</span>.accept(Unknown Source)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.jdbc</span><span class="hljs-selector-class">.JdbcConnection</span>.queryWithBlockingConsumer(JdbcConnection<span class="hljs-selector-class">.java</span>:<span class="hljs-number">535</span>)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.connector</span><span class="hljs-selector-class">.postgresql</span><span class="hljs-selector-class">.RecordsSnapshotProducer</span>.takeSnapshot(RecordsSnapshotProducer<span class="hljs-selector-class">.java</span>:<span class="hljs-number">224</span>)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.connector</span><span class="hljs-selector-class">.postgresql</span><span class="hljs-selector-class">.RecordsSnapshotProducer</span>.lambda<span class="hljs-variable">$start</span>$<span class="hljs-number">0</span>(RecordsSnapshotProducer<span class="hljs-selector-class">.java</span>:<span class="hljs-number">87</span>)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.connector</span><span class="hljs-selector-class">.postgresql</span>.RecordsSnapshotProducer$<span class="hljs-variable">$Lambda</span>$<span class="hljs-number">206</span>/<span class="hljs-number">589332928</span>.run(Unknown Source)
at java<span class="hljs-selector-class">.util</span><span class="hljs-selector-class">.concurrent</span><span class="hljs-selector-class">.CompletableFuture</span>.uniRun(CompletableFuture<span class="hljs-selector-class">.java</span>:<span class="hljs-number">705</span>)
at java<span class="hljs-selector-class">.util</span><span class="hljs-selector-class">.concurrent</span><span class="hljs-selector-class">.CompletableFuture</span>.uniRunStage(CompletableFuture<span class="hljs-selector-class">.java</span>:<span class="hljs-number">717</span>)
at java<span class="hljs-selector-class">.util</span><span class="hljs-selector-class">.concurrent</span><span class="hljs-selector-class">.CompletableFuture</span>.thenRun(CompletableFuture<span class="hljs-selector-class">.java</span>:<span class="hljs-number">2010</span>)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.connector</span><span class="hljs-selector-class">.postgresql</span><span class="hljs-selector-class">.RecordsSnapshotProducer</span>.start(RecordsSnapshotProducer<span class="hljs-selector-class">.java</span>:<span class="hljs-number">87</span>)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.connector</span><span class="hljs-selector-class">.postgresql</span><span class="hljs-selector-class">.PostgresConnectorTask</span>.start(PostgresConnectorTask<span class="hljs-selector-class">.java</span>:<span class="hljs-number">126</span>)
at io<span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.connector</span><span class="hljs-selector-class">.common</span><span class="hljs-selector-class">.BaseSourceTask</span>.start(BaseSourceTask<span class="hljs-selector-class">.java</span>:<span class="hljs-number">47</span>)
at org<span class="hljs-selector-class">.apache</span><span class="hljs-selector-class">.pulsar</span><span class="hljs-selector-class">.io</span><span class="hljs-selector-class">.kafka</span><span class="hljs-selector-class">.connect</span><span class="hljs-selector-class">.KafkaConnectSource</span>.open(KafkaConnectSource<span class="hljs-selector-class">.java</span>:<span class="hljs-number">127</span>)
at org<span class="hljs-selector-class">.apache</span><span class="hljs-selector-class">.pulsar</span><span class="hljs-selector-class">.io</span><span class="hljs-selector-class">.debezium</span><span class="hljs-selector-class">.DebeziumSource</span>.open(DebeziumSource<span class="hljs-selector-class">.java</span>:<span class="hljs-number">100</span>)
at org<span class="hljs-selector-class">.apache</span><span class="hljs-selector-class">.pulsar</span><span class="hljs-selector-class">.functions</span><span class="hljs-selector-class">.instance</span><span class="hljs-selector-class">.JavaInstanceRunnable</span>.setupInput(JavaInstanceRunnable<span class="hljs-selector-class">.java</span>:<span class="hljs-number">690</span>)
at org<span class="hljs-selector-class">.apache</span><span class="hljs-selector-class">.pulsar</span><span class="hljs-selector-class">.functions</span><span class="hljs-selector-class">.instance</span><span class="hljs-selector-class">.JavaInstanceRunnable</span>.setupJavaInstance(JavaInstanceRunnable<span class="hljs-selector-class">.java</span>:<span class="hljs-number">200</span>)
at org<span class="hljs-selector-class">.apache</span><span class="hljs-selector-class">.pulsar</span><span class="hljs-selector-class">.functions</span><span class="hljs-selector-class">.instance</span><span class="hljs-selector-class">.JavaInstanceRunnable</span>.run(JavaInstanceRunnable<span class="hljs-selector-class">.java</span>:<span class="hljs-number">230</span>)
at java<span class="hljs-selector-class">.lang</span><span class="hljs-selector-class">.Thread</span>.run(Thread<span class="hljs-selector-class">.java</span>:<span class="hljs-number">748</span>)
</code></pre>
<p>If you encounter the above problems in synchronizing data, please refer to <a href="https://github.com/apache/pulsar/issues/4075">this</a> and add the following configuration to the configuration file:</p>
<pre><code class="hljs css language-$xslt"><span class="hljs-built_in">max</span>.<span class="hljs-built_in">queue</span>.<span class="hljs-built_in">size</span>=
</code></pre>
</span></div></article></div><div class="docs-prevnext"></div></div></div><nav class="onPageNav"><ul class="toc-headings"><li><a href="#configuration">Configuration</a><ul class="toc-headings"><li><a href="#converter-options">Converter Options</a></li><li><a href="#mongodb-configuration">MongoDB Configuration</a></li></ul></li><li><a href="#example-of-mysql">Example of MySQL</a><ul class="toc-headings"><li><a href="#configuration-1">Configuration</a></li><li><a href="#usage">Usage</a></li></ul></li><li><a href="#example-of-postgresql">Example of PostgreSQL</a><ul class="toc-headings"><li><a href="#configuration-2">Configuration</a></li><li><a href="#usage-1">Usage</a></li></ul></li><li><a href="#example-of-mongodb">Example of MongoDB</a><ul class="toc-headings"><li><a href="#usage-2">Usage</a></li></ul></li><li><a href="#faq">FAQ</a><ul class="toc-headings"><li><a href="#debezium-postgres-connector-will-hang-when-create-snap">Debezium postgres connector will hang when create snap</a></li></ul></li></ul></nav></div><footer class="nav-footer" id="footer"><section class="copyright">Copyright © 2022 The Apache Software Foundation. All Rights Reserved. Apache, Apache Pulsar and the Apache feather logo are trademarks of The Apache Software Foundation.</section><span><script>
const community = document.querySelector("a[href='#community']").parentNode;
const communityMenu =
'<li>' +
'<a id="community-menu" href="#">Community <span style="font-size: 0.75em">&nbsp;▼</span></a>' +
'<div id="community-dropdown" class="hide">' +
'<ul id="community-dropdown-items">' +
'<li><a href="/en/contact">Contact</a></li>' +
'<li><a href="/en/contributing">Contributing</a></li>' +
'<li><a href="/en/coding-guide">Coding guide</a></li>' +
'<li><a href="/en/events">Events</a></li>' +
'<li><a href="https://twitter.com/Apache_Pulsar" target="_blank">Twitter &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/wiki" target="_blank">Wiki &#x2750</a></li>' +
'<li><a href="https://github.com/apache/pulsar/issues" target="_blank">Issue tracking &#x2750</a></li>' +
'<li><a href="https://pulsar-summit.org/" target="_blank">Pulsar Summit &#x2750</a></li>' +
'<li>&nbsp;</li>' +
'<li><a href="/en/resources">Resources</a></li>' +
'<li><a href="/en/team">Team</a></li>' +
'<li><a href="/en/powered-by">Powered By</a></li>' +
'</ul>' +
'</div>' +
'</li>';
community.innerHTML = communityMenu;
const communityMenuItem = document.getElementById("community-menu");
const communityDropDown = document.getElementById("community-dropdown");
communityMenuItem.addEventListener("click", function(event) {
event.preventDefault();
if (communityDropDown.className == 'hide') {
communityDropDown.className = 'visible';
} else {
communityDropDown.className = 'hide';
}
});
</script></span></footer></div><script>window.twttr=(function(d,s, id){var js,fjs=d.getElementsByTagName(s)[0],t=window.twttr||{};if(d.getElementById(id))return t;js=d.createElement(s);js.id=id;js.src='https://platform.twitter.com/widgets.js';fjs.parentNode.insertBefore(js, fjs);t._e = [];t.ready = function(f) {t._e.push(f);};return t;}(document, 'script', 'twitter-wjs'));</script></body></html>