[Issue 5474][pulsar-io-debezium] Support CDC Connector for MongoDB (#5590)

* support mongodb connector

* add tester

* add tester

* add tester

* add  license header

* fix by some comments

* add jdbc source sink

* add init data

* fix code style and default port 27017 of mongodb
diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index 381d312..68460b0 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -66,6 +66,7 @@
     <file><source>${basedir}/../../pulsar-io/mongo/target/pulsar-io-mongo-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/debezium/mysql/target/pulsar-io-debezium-mysql-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/debezium/postgres/target/pulsar-io-debezium-postgres-${project.version}.nar</source></file>
+    <file><source>${basedir}/../../pulsar-io/debezium/mongodb/target/pulsar-io-debezium-mongodb-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/influxdb/target/pulsar-io-influxdb-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/redis/target/pulsar-io-redis-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/flume/target/pulsar-io-flume-${project.version}.nar</source></file>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
new file mode 100644
index 0000000..5385b6a
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+public class DebeziumMongoDbContainer extends ChaosContainer<DebeziumMongoDbContainer> {
+
+    public static final String NAME = "debezium-mongodb-example";
+
+    public static final Integer[] PORTS = { 27017 };
+    private static final String IMAGE_NAME = "debezium/example-mongodb:0.10";
+
+    public DebeziumMongoDbContainer(String clusterName) {
+        super(clusterName, IMAGE_NAME);
+        this.withEnv("MONGODB_USER", "mongodb");
+        this.withEnv("MONGODB_PASSWORD", "mongodb");
+    }
+    @Override
+    public String getContainerName() {
+        return clusterName;
+    }
+
+    @Override
+    protected void configure() {
+        super.configure();
+        this.withNetworkAliases(NAME)
+                .withExposedPorts(PORTS)
+                .withCreateContainerCmdModifier(createContainerCmd -> {
+                    createContainerCmd.withHostName(NAME);
+                    createContainerCmd.withName(getContainerName());
+                })
+                .waitingFor(new HostPortWaitStrategy());
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 765bd19..19c0830 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -45,6 +45,7 @@
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
+import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
 import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
 import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
@@ -133,6 +134,11 @@
         testDebeziumPostgreSqlConnect();
     }
 
+    @Test
+    public void testDebeziumMongoDbSource() throws Exception{
+        testDebeziumMongoDbConnect();
+    }
+
     private void testSink(SinkTester tester, boolean builtin) throws Exception {
         tester.startServiceContainer(pulsarCluster);
         try {
@@ -2167,4 +2173,60 @@
         getSourceInfoNotFound(tenant, namespace, sourceName);
     }
 
+    private  void testDebeziumMongoDbConnect() throws Exception {
+
+        final String tenant = TopicName.PUBLIC_TENANT;
+        final String namespace = TopicName.DEFAULT_NAMESPACE;
+        final String outputTopicName = "debe-output-topic-name";
+        final String consumeTopicName = "public/default/dbserver1.inventory.products";
+        final String sourceName = "test-source-connector-"
+                + functionRuntimeType + "-name-" + randomName(8);
+
+        // This is the binlog count that contained in mongodb container.
+        final int numMessages = 17;
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .build();
+
+        @Cleanup
+        Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes())
+                .topic(consumeTopicName)
+                .subscriptionName("debezium-source-tester")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscribe();
+
+        @Cleanup
+        DebeziumMongoDbSourceTester sourceTester = new DebeziumMongoDbSourceTester(pulsarCluster);
+
+        // setup debezium mongodb server
+        DebeziumMongoDbContainer mongoDbContainer = new DebeziumMongoDbContainer(pulsarCluster.getClusterName());
+        sourceTester.setServiceContainer(mongoDbContainer);
+        // prepare the testing environment for source
+        prepareSource(sourceTester);
+
+        // submit the source connector
+        submitSourceConnector(sourceTester, tenant, namespace, sourceName, outputTopicName);
+
+        // get source info
+        getSourceInfoSuccess(sourceTester, tenant, namespace, sourceName);
+
+        // get source status
+        Failsafe.with(statusRetryPolicy).run(() -> getSourceStatus(tenant, namespace, sourceName));
+
+        // wait for source to process messages
+        Failsafe.with(statusRetryPolicy).run(() ->
+                waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages));
+
+        // validate the source result
+        sourceTester.validateSourceResult(consumer, 9);
+
+        // delete the source
+        deleteSource(tenant, namespace, sourceName);
+
+        // get source info (source should be deleted)
+        getSourceInfoNotFound(tenant, namespace, sourceName);
+    }
+
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java
new file mode 100644
index 0000000..3723b0a
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMongoDbSourceTester.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+
+import java.io.Closeable;
+import java.util.Map;
+
+@Slf4j
+public class DebeziumMongoDbSourceTester extends SourceTester<DebeziumMongoDbContainer> implements Closeable {
+
+    private static final String NAME = "debezium-mongodb";
+
+    private final String pulsarServiceUrl;
+
+    @Getter
+    private DebeziumMongoDbContainer debeziumMongoDbContainer;
+
+    private final PulsarCluster pulsarCluster;
+    public DebeziumMongoDbSourceTester(PulsarCluster cluster) {
+        super(NAME);
+        this.pulsarCluster = cluster;
+        pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;
+
+        sourceConfig.put("mongodb.hosts", "rs0/" + DebeziumMongoDbContainer.NAME + ":27017");
+        sourceConfig.put("mongodb.name", "dbserver1");
+        sourceConfig.put("mongodb.user", "debezium");
+        sourceConfig.put("mongodb.password", "dbz");
+        sourceConfig.put("mongodb.task.id","1");
+        sourceConfig.put("database.whitelist", "inventory");
+        sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
+    }
+
+    @Override
+    public void setServiceContainer(DebeziumMongoDbContainer container) {
+        log.info("start debezium mongodb server container.");
+        debeziumMongoDbContainer = container;
+        pulsarCluster.startService(DebeziumMongoDbContainer.NAME, debeziumMongoDbContainer);
+    }
+
+    @Override
+    public void prepareSource() throws Exception {
+        this.debeziumMongoDbContainer.execCmd( "bash", "-c", "/usr/local/bin/init-inventory.sh");
+        log.info("debezium mongodb server already contains preconfigured data.");
+    }
+
+    @Override
+    public Map<String, String> produceSourceMessages(int numMessages) throws Exception {
+        log.info("debezium mongodb server already contains preconfigured data.");
+        return null;
+    }
+
+    @Override
+    public String valueContains() {
+        return "dbserver1.inventory.products.Envelope";
+    }
+
+    @Override
+    public void close() {
+        if (pulsarCluster != null) {
+            pulsarCluster.stopService(DebeziumMongoDbContainer.NAME, debeziumMongoDbContainer);
+        }
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
index 580bf2c..1432c62 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
@@ -86,24 +86,6 @@
         return null;
     }
 
-    public void validateSourceResult(Consumer<KeyValue<byte[], byte[]>> consumer, int number) throws Exception {
-        int recordsNumber = 0;
-        Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
-        while(msg != null) {
-            recordsNumber ++;
-            final String key = new String(msg.getValue().getKey());
-            final String value = new String(msg.getValue().getValue());
-            log.info("Received message: key = {}, value = {}.", key, value);
-            Assert.assertTrue(key.contains("dbserver1.inventory.products.Key"));
-            Assert.assertTrue(value.contains("dbserver1.inventory.products.Value"));
-            consumer.acknowledge(msg);
-            msg = consumer.receive(1, TimeUnit.SECONDS);
-        }
-
-        Assert.assertEquals(recordsNumber, number);
-        log.info("Stop debezium mysql server container. topic: {} has {} records.", consumer.getTopic(), recordsNumber);
-    }
-
     @Override
     public void close() {
         if (pulsarCluster != null) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
index fca631f..e99c280 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
@@ -88,24 +88,6 @@
         return null;
     }
 
-    public void validateSourceResult(Consumer<KeyValue<byte[], byte[]>> consumer, int number) throws Exception {
-        int recordsNumber = 0;
-        Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
-        while(msg != null) {
-            recordsNumber ++;
-            final String key = new String(msg.getValue().getKey());
-            final String value = new String(msg.getValue().getValue());
-            log.info("Received message: key = {}, value = {}.", key, value);
-            Assert.assertTrue(key.contains("dbserver1.inventory.products.Key"));
-            Assert.assertTrue(value.contains("dbserver1.inventory.products.Value"));
-            consumer.acknowledge(msg);
-            msg = consumer.receive(1, TimeUnit.SECONDS);
-        }
-
-        Assert.assertEquals(recordsNumber, number);
-        log.info("Stop debezium postgresql server container. topic: {} has {} records.", consumer.getTopic(), recordsNumber);
-    }
-
     @Override
     public void close() {
         if (pulsarCluster != null) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
index f1feb70..3b16a38 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
@@ -19,14 +19,22 @@
 package org.apache.pulsar.tests.integration.io;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.testcontainers.containers.GenericContainer;
+import org.testng.Assert;
 import org.testng.collections.Maps;
 
 /**
  * A tester used for testing a specific source.
  */
 @Getter
+@Slf4j
 public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
 
     protected final String sourceType;
@@ -51,4 +59,27 @@
 
     public abstract Map<String, String> produceSourceMessages(int numMessages) throws Exception;
 
+    public void validateSourceResult(Consumer<KeyValue<byte[], byte[]>> consumer, int number) throws Exception {
+        int recordsNumber = 0;
+        Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
+        while(msg != null) {
+            recordsNumber ++;
+            final String key = new String(msg.getValue().getKey());
+            final String value = new String(msg.getValue().getValue());
+            log.info("Received message: key = {}, value = {}.", key, value);
+            Assert.assertTrue(key.contains(this.keyContains()));
+            Assert.assertTrue(value.contains(this.valueContains()));
+            consumer.acknowledge(msg);
+            msg = consumer.receive(1, TimeUnit.SECONDS);
+        }
+
+        Assert.assertEquals(recordsNumber, number);
+        log.info("Stop {} server container. topic: {} has {} records.", getSourceType(), consumer.getTopic(), recordsNumber);
+    }
+    public String keyContains(){
+        return "dbserver1.inventory.products.Key";
+    }
+    public String valueContains(){
+        return "dbserver1.inventory.products.Value";
+    }
 }