[Issue 5474][pulsar-io-debezium] Support CDC Connector for MongoDB (#5590)
* support mongodb connector
* add tester
* add tester
* add tester
* add license header
* fix by some comments
* add jdbc source sink
* add init data
* fix code style and default port 27017 of mongodb
diff --git a/pulsar-io/debezium/mongodb/pom.xml b/pulsar-io/debezium/mongodb/pom.xml
new file mode 100644
index 0000000..f9250af
--- /dev/null
+++ b/pulsar-io/debezium/mongodb/pom.xml
@@ -0,0 +1,58 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io-debezium</artifactId>
+ <version>2.5.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-io-debezium-mongodb</artifactId>
+ <name>Pulsar IO :: Debezium :: mongodb</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-debezium-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-mongodb</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
new file mode 100644
index 0000000..da21ab0
--- /dev/null
+++ b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium.mongodb;
+
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.pulsar.io.debezium.DebeziumSource;
+
+import java.util.Map;
+
+/**
+ * A pulsar source that runs debezium mongodb source
+ */
+public class DebeziumMongoDbSource extends DebeziumSource {
+ static private final String DEFAULT_TASK = "io.debezium.connector.mongodb.MongoDbConnectorTask";
+
+ @Override
+ public void setDbConnectorTask(Map<String, Object> config) throws Exception {
+ throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK);
+ }
+}
diff --git a/pulsar-io/debezium/mongodb/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/debezium/mongodb/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..cc69bd0
--- /dev/null
+++ b/pulsar-io/debezium/mongodb/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: debezium-mongodb
+description: Debezium MongoDb Source
+sourceClass: org.apache.pulsar.io.debezium.mongodb.DebeziumMongoDbSource
diff --git a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
new file mode 100644
index 0000000..af73516
--- /dev/null
+++ b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+tenant: "public"
+namespace: "default"
+name: "debezium-mongodb-source"
+topicName: "debezium-mongodb-topic"
+archive: "connectors/pulsar-io-debezium-mongodb-2.4.0-SNAPSHOT.nar"
+
+parallelism: 1
+
+configs:
+ ## config for pg, docker image: debezium/example-mongodb:0.8
+ mongodb.hosts: "rs0/mongodb:27017"
+ mongodb.name: "dbserver1"
+ mongodb.user: "debezium"
+ mongodb.password: "dbz"
+ mongodb.task.id: "1"
+ database.whitelist: "inventory"
+
+ ## PULSAR_SERVICE_URL_CONFIG
+ pulsar.service.url: "pulsar://127.0.0.1:6650"
\ No newline at end of file
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index a0a9840..158d43b 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -35,6 +35,7 @@
<module>core</module>
<module>mysql</module>
<module>postgres</module>
+ <module>mongodb</module>
</modules>
</project>