[Issue 5474][pulsar-io-debezium] Support CDC Connector for MongoDB (#5590)
* support mongodb connector
* add tester
* add tester
* add tester
* add license header
* fix by some comments
* add jdbc source sink
* add init data
* fix code style and default port 27017 of mongodb
diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index 381d312..68460b0 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -66,6 +66,7 @@
<file><source>${basedir}/../../pulsar-io/mongo/target/pulsar-io-mongo-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/mysql/target/pulsar-io-debezium-mysql-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/debezium/postgres/target/pulsar-io-debezium-postgres-${project.version}.nar</source></file>
+ <file><source>${basedir}/../../pulsar-io/debezium/mongodb/target/pulsar-io-debezium-mongodb-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/influxdb/target/pulsar-io-influxdb-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/redis/target/pulsar-io-redis-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/flume/target/pulsar-io-flume-${project.version}.nar</source></file>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
new file mode 100644
index 0000000..5385b6a
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+public class DebeziumMongoDbContainer extends ChaosContainer<DebeziumMongoDbContainer> {
+
+ public static final String NAME = "debezium-mongodb-example";
+
+ public static final Integer[] PORTS = { 27017 };
+ private static final String IMAGE_NAME = "debezium/example-mongodb:0.10";
+
+ public DebeziumMongoDbContainer(String clusterName) {
+ super(clusterName, IMAGE_NAME);
+ this.withEnv("MONGODB_USER", "mongodb");
+ this.withEnv("MONGODB_PASSWORD", "mongodb");
+ }
+ @Override
+ public String getContainerName() {
+ return clusterName;
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+ this.withNetworkAliases(NAME)
+ .withExposedPorts(PORTS)
+ .withCreateContainerCmdModifier(createContainerCmd -> {
+ createContainerCmd.withHostName(NAME);
+ createContainerCmd.withName(getContainerName());
+ })
+ .waitingFor(new HostPortWaitStrategy());
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 765bd19..19c0830 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -45,6 +45,7 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
+import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
@@ -133,6 +134,11 @@
testDebeziumPostgreSqlConnect();
}
+ @Test
+ public void testDebeziumMongoDbSource() throws Exception{
+ testDebeziumMongoDbConnect();
+ }
+
private void testSink(SinkTester tester, boolean builtin) throws Exception {
tester.startServiceContainer(pulsarCluster);
try {
@@ -2167,4 +2173,60 @@
getSourceInfoNotFound(tenant, namespace, sourceName);
}
+ private void testDebeziumMongoDbConnect() throws Exception {
+
+ final String tenant = TopicName.PUBLIC_TENANT;
+ final String namespace = TopicName.DEFAULT_NAMESPACE;
+ final String outputTopicName = "debe-output-topic-name";
+ final String consumeTopicName = "public/default/dbserver1.inventory.products";
+ final String sourceName = "test-source-connector-"
+ + functionRuntimeType + "-name-" + randomName(8);
+
+ // This is the binlog count that contained in mongodb container.
+ final int numMessages = 17;
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+
+ @Cleanup
+ Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes())
+ .topic(consumeTopicName)
+ .subscriptionName("debezium-source-tester")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+
+ @Cleanup
+ DebeziumMongoDbSourceTester sourceTester = new DebeziumMongoDbSourceTester(pulsarCluster);
+
+ // setup debezium mongodb server
+ DebeziumMongoDbContainer mongoDbContainer = new DebeziumMongoDbContainer(pulsarCluster.getClusterName());
+ sourceTester.setServiceContainer(mongoDbContainer);
+ // prepare the testing environment for source
+ prepareSource(sourceTester);
+
+ // submit the source connector
+ submitSourceConnector(sourceTester, tenant, namespace, sourceName, outputTopicName);
+
+ // get source info
+ getSourceInfoSuccess(sourceTester, tenant, namespace, sourceName);
+
+ // get source status
+ Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName));
+
+ // wait for source to process messages
+ Failsafe.with(statusRetryPolicy).run(() ->
+ waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
+
+ // validate the source result
+ sourceTester.validateSourceResult(consumer, 9);
+
+ // delete the source
+ deleteSource(tenant, namespace, sourceName);
+
+ // get source info (source should be deleted)
+ getSourceInfoNotFound(tenant, namespace, sourceName);
+ }
+
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java
new file mode 100644
index 0000000..3723b0a
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+
+import java.io.Closeable;
+import java.util.Map;
+
+@Slf4j
+public class DebeziumMongoDbSourceTester extends SourceTester<DebeziumMongoDbContainer> implements Closeable {
+
+ private static final String NAME = "debezium-mongodb";
+
+ private final String pulsarServiceUrl;
+
+ @Getter
+ private DebeziumMongoDbContainer debeziumMongoDbContainer;
+
+ private final PulsarCluster pulsarCluster;
+ public DebeziumMongoDbSourceTester(PulsarCluster cluster) {
+ super(NAME);
+ this.pulsarCluster = cluster;
+ pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;
+
+ sourceConfig.put("mongodb.hosts", "rs0/" + DebeziumMongoDbContainer.NAME + ":27017");
+ sourceConfig.put("mongodb.name", "dbserver1");
+ sourceConfig.put("mongodb.user", "debezium");
+ sourceConfig.put("mongodb.password", "dbz");
+ sourceConfig.put("mongodb.task.id","1");
+ sourceConfig.put("database.whitelist", "inventory");
+ sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
+ }
+
+ @Override
+ public void setServiceContainer(DebeziumMongoDbContainer container) {
+ log.info("start debezium mongodb server container.");
+ debeziumMongoDbContainer = container;
+ pulsarCluster.startService(DebeziumMongoDbContainer.NAME, debeziumMongoDbContainer);
+ }
+
+ @Override
+ public void prepareSource() throws Exception {
+ this.debeziumMongoDbContainer.execCmd( "bash", "-c", "/usr/local/bin/init-inventory.sh");
+ log.info("debezium mongodb server already contains preconfigured data.");
+ }
+
+ @Override
+ public Map<String, String> produceSourceMessages(int numMessages) throws Exception {
+ log.info("debezium mongodb server already contains preconfigured data.");
+ return null;
+ }
+
+ @Override
+ public String valueContains() {
+ return "dbserver1.inventory.products.Envelope";
+ }
+
+ @Override
+ public void close() {
+ if (pulsarCluster != null) {
+ pulsarCluster.stopService(DebeziumMongoDbContainer.NAME, debeziumMongoDbContainer);
+ }
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
index 580bf2c..1432c62 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
@@ -86,24 +86,6 @@
return null;
}
- public void validateSourceResult(Consumer<KeyValue<byte[], byte[]>> consumer, int number) throws Exception {
- int recordsNumber = 0;
- Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
- while(msg != null) {
- recordsNumber ++;
- final String key = new String(msg.getValue().getKey());
- final String value = new String(msg.getValue().getValue());
- log.info("Received message: key = {}, value = {}.", key, value);
- Assert.assertTrue(key.contains("dbserver1.inventory.products.Key"));
- Assert.assertTrue(value.contains("dbserver1.inventory.products.Value"));
- consumer.acknowledge(msg);
- msg = consumer.receive(1, TimeUnit.SECONDS);
- }
-
- Assert.assertEquals(recordsNumber, number);
- log.info("Stop debezium mysql server container. topic: {} has {} records.", consumer.getTopic(), recordsNumber);
- }
-
@Override
public void close() {
if (pulsarCluster != null) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
index fca631f..e99c280 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
@@ -88,24 +88,6 @@
return null;
}
- public void validateSourceResult(Consumer<KeyValue<byte[], byte[]>> consumer, int number) throws Exception {
- int recordsNumber = 0;
- Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
- while(msg != null) {
- recordsNumber ++;
- final String key = new String(msg.getValue().getKey());
- final String value = new String(msg.getValue().getValue());
- log.info("Received message: key = {}, value = {}.", key, value);
- Assert.assertTrue(key.contains("dbserver1.inventory.products.Key"));
- Assert.assertTrue(value.contains("dbserver1.inventory.products.Value"));
- consumer.acknowledge(msg);
- msg = consumer.receive(1, TimeUnit.SECONDS);
- }
-
- Assert.assertEquals(recordsNumber, number);
- log.info("Stop debezium postgresql server container. topic: {} has {} records.", consumer.getTopic(), recordsNumber);
- }
-
@Override
public void close() {
if (pulsarCluster != null) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
index f1feb70..3b16a38 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
@@ -19,14 +19,22 @@
package org.apache.pulsar.tests.integration.io;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.schema.KeyValue;
import org.testcontainers.containers.GenericContainer;
+import org.testng.Assert;
import org.testng.collections.Maps;
/**
* A tester used for testing a specific source.
*/
@Getter
+@Slf4j
public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
protected final String sourceType;
@@ -51,4 +59,27 @@
public abstract Map<String, String> produceSourceMessages(int numMessages) throws Exception;
+ public void validateSourceResult(Consumer<KeyValue<byte[], byte[]>> consumer, int number) throws Exception {
+ int recordsNumber = 0;
+ Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
+ while(msg != null) {
+ recordsNumber ++;
+ final String key = new String(msg.getValue().getKey());
+ final String value = new String(msg.getValue().getValue());
+ log.info("Received message: key = {}, value = {}.", key, value);
+ Assert.assertTrue(key.contains(this.keyContains()));
+ Assert.assertTrue(value.contains(this.valueContains()));
+ consumer.acknowledge(msg);
+ msg = consumer.receive(1, TimeUnit.SECONDS);
+ }
+
+ Assert.assertEquals(recordsNumber, number);
+ log.info("Stop {} server container. topic: {} has {} records.", getSourceType(), consumer.getTopic(), recordsNumber);
+ }
+ public String keyContains(){
+ return "dbserver1.inventory.products.Key";
+ }
+ public String valueContains(){
+ return "dbserver1.inventory.products.Value";
+ }
}