[Feature](cdc) add MongoDB cdc (#343)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index beaa5b9..23145e4 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -285,6 +285,19 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-sql-connector-mongodb-cdc</artifactId>
+            <!-- the dependency is available only for stable releases. -->
+            <version>${flink.sql.cdc.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>flink-shaded-guava</artifactId>
+                    <groupId>org.apache.flink</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-runtime-web</artifactId>
             <version>${flink.version}</version>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 86745fa..3608e95 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -36,6 +36,7 @@
 import java.sql.ResultSet;
 import java.sql.Statement;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -241,6 +242,32 @@
         return sb.toString();
     }
 
+    public Map<String, String> getTableFieldNames(String databaseName, String tableName) {
+        if (!databaseExists(databaseName)) {
+            throw new DorisRuntimeException("database" + databaseName + " is not exists");
+        }
+        String sql =
+                String.format(
+                        "SELECT COLUMN_NAME,DATA_TYPE "
+                                + "FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= '%s'",
+                        databaseName, tableName);
+
+        Map<String, String> columnValues = new HashMap<>();
+        try (PreparedStatement ps =
+                jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                String filedName = rs.getString(1);
+                String datatype = rs.getString(2);
+                columnValues.put(filedName, datatype);
+            }
+            return columnValues;
+        } catch (Exception e) {
+            throw new DorisSystemException(
+                    String.format("The following SQL query could not be executed: %s", sql), e);
+        }
+    }
+
     private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) {
         String fieldType = field.getTypeString();
         if (isKey && DorisType.STRING.equals(fieldType)) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/MongoDBJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/MongoDBJsonDebeziumSchemaSerializer.java
new file mode 100644
index 0000000..b2ff598
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/MongoDBJsonDebeziumSchemaSerializer.java
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.MongoJsonDebeziumDataChange;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.MongoJsonDebeziumSchemaChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
+import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+
+public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MongoDBJsonDebeziumSchemaSerializer.class);
+    private final Pattern pattern;
+    private final DorisOptions dorisOptions;
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    // table name of the cdc upstream, format is db.tbl
+    private final String sourceTableName;
+    private String lineDelimiter = LINE_DELIMITER_DEFAULT;
+    private boolean ignoreUpdateBefore = true;
+    // <cdc db.schema.table, doris db.table>
+    private Map<String, String> tableMapping;
+    // create table properties
+    private Map<String, String> tableProperties;
+    private String targetDatabase;
+
+    private CdcDataChange dataChange;
+    private CdcSchemaChange schemaChange;
+
+    private String targetTablePrefix;
+    private String targetTableSuffix;
+
+    public MongoDBJsonDebeziumSchemaSerializer(
+            DorisOptions dorisOptions,
+            Pattern pattern,
+            String sourceTableName,
+            DorisExecutionOptions executionOptions,
+            Map<String, String> tableMapping,
+            Map<String, String> tableProperties,
+            String targetDatabase,
+            String targetTablePrefix,
+            String targetTableSuffix) {
+        this.dorisOptions = dorisOptions;
+        this.pattern = pattern;
+        this.sourceTableName = sourceTableName;
+        // Prevent loss of decimal data precision
+        this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+        JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
+        this.objectMapper.setNodeFactory(jsonNodeFactory);
+        this.tableMapping = tableMapping;
+        this.tableProperties = tableProperties;
+        this.targetDatabase = targetDatabase;
+        this.targetTablePrefix = targetTablePrefix;
+        this.targetTableSuffix = targetTableSuffix;
+        if (executionOptions != null) {
+            this.lineDelimiter =
+                    executionOptions
+                            .getStreamLoadProp()
+                            .getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
+            this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
+        }
+        init();
+    }
+
+    private void init() {
+        JsonDebeziumChangeContext changeContext =
+                new JsonDebeziumChangeContext(
+                        dorisOptions,
+                        tableMapping,
+                        sourceTableName,
+                        targetDatabase,
+                        tableProperties,
+                        objectMapper,
+                        pattern,
+                        lineDelimiter,
+                        ignoreUpdateBefore,
+                        targetTablePrefix,
+                        targetTableSuffix);
+        this.dataChange = new MongoJsonDebeziumDataChange(changeContext);
+        this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
+    }
+
+    @Override
+    public DorisRecord serialize(String record) throws IOException {
+        LOG.debug("received debezium json data {} :", record);
+        JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+        String op = getOperateType(recordRoot);
+        try {
+            schemaChange.schemaChange(recordRoot);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return dataChange.serialize(record, recordRoot, op);
+    }
+
+    private String getOperateType(JsonNode recordRoot) {
+        return recordRoot.get("operationType").asText();
+    }
+
+    public static MongoDBJsonDebeziumSchemaSerializer.Builder builder() {
+        return new MongoDBJsonDebeziumSchemaSerializer.Builder();
+    }
+
+    public static class Builder {
+        private DorisOptions dorisOptions;
+        private Pattern addDropDDLPattern;
+        private String sourceTableName;
+        private DorisExecutionOptions executionOptions;
+        private Map<String, String> tableMapping;
+        private Map<String, String> tableProperties;
+        private String targetDatabase;
+        private String targetTablePrefix = "";
+        private String targetTableSuffix = "";
+
+        public MongoDBJsonDebeziumSchemaSerializer.Builder setDorisOptions(
+                DorisOptions dorisOptions) {
+            this.dorisOptions = dorisOptions;
+            return this;
+        }
+
+        public MongoDBJsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) {
+            this.addDropDDLPattern = addDropDDLPattern;
+            return this;
+        }
+
+        public MongoDBJsonDebeziumSchemaSerializer.Builder setSourceTableName(
+                String sourceTableName) {
+            this.sourceTableName = sourceTableName;
+            return this;
+        }
+
+        public MongoDBJsonDebeziumSchemaSerializer.Builder setExecutionOptions(
+                DorisExecutionOptions executionOptions) {
+            this.executionOptions = executionOptions;
+            return this;
+        }
+
+        public MongoDBJsonDebeziumSchemaSerializer.Builder setTableMapping(
+                Map<String, String> tableMapping) {
+            this.tableMapping = tableMapping;
+            return this;
+        }
+
+        public MongoDBJsonDebeziumSchemaSerializer.Builder setTableProperties(
+                Map<String, String> tableProperties) {
+            this.tableProperties = tableProperties;
+            return this;
+        }
+
+        public MongoDBJsonDebeziumSchemaSerializer.Builder setTargetDatabase(
+                String targetDatabase) {
+            this.targetDatabase = targetDatabase;
+            return this;
+        }
+
+        public MongoDBJsonDebeziumSchemaSerializer build() {
+            return new MongoDBJsonDebeziumSchemaSerializer(
+                    dorisOptions,
+                    addDropDDLPattern,
+                    sourceTableName,
+                    executionOptions,
+                    tableMapping,
+                    tableProperties,
+                    targetDatabase,
+                    targetTablePrefix,
+                    targetTableSuffix);
+        }
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java
index c344aae..cba431c 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java
@@ -31,7 +31,7 @@
  */
 public abstract class CdcDataChange implements ChangeEvent {
 
-    protected abstract DorisRecord serialize(String record, JsonNode recordRoot, String op)
+    public abstract DorisRecord serialize(String record, JsonNode recordRoot, String op)
             throws IOException;
 
     protected abstract Map<String, Object> extractBeforeRow(JsonNode record);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumDataChange.java
new file mode 100644
index 0000000..407a7e7
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumDataChange.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.writer.ChangeEvent;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
+import static org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils.extractJsonNode;
+import static org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils.getDorisTableIdentifier;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DATA;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DATABASE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DOCUMENT_KEY;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_NAMESPACE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_TABLE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.ID_FIELD;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OID_FIELD;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OP_DELETE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OP_INSERT;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OP_REPLACE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OP_UPDATE;
+
+public class MongoJsonDebeziumDataChange extends CdcDataChange implements ChangeEvent {
+    private static final Logger LOG = LoggerFactory.getLogger(MongoJsonDebeziumDataChange.class);
+
+    public DorisOptions dorisOptions;
+    public String lineDelimiter;
+    public JsonDebeziumChangeContext changeContext;
+    public ObjectMapper objectMapper;
+    public Map<String, String> tableMapping;
+
+    public MongoJsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) {
+        this.changeContext = changeContext;
+        this.dorisOptions = changeContext.getDorisOptions();
+        this.objectMapper = changeContext.getObjectMapper();
+        this.lineDelimiter = changeContext.getLineDelimiter();
+        this.tableMapping = changeContext.getTableMapping();
+    }
+
+    @Override
+    public DorisRecord serialize(String record, JsonNode recordRoot, String op) throws IOException {
+        // Filter out table records that are not in tableMapping
+        String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+        String dorisTableIdentifier =
+                getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping);
+        if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
+            LOG.warn(
+                    "filter table {}, because it is not listened, record detail is {}",
+                    cdcTableIdentifier,
+                    record);
+            return null;
+        }
+        Map<String, Object> valueMap;
+        switch (op) {
+            case OP_INSERT:
+            case OP_UPDATE:
+            case OP_REPLACE:
+                valueMap = extractAfterRow(recordRoot);
+                addDeleteSign(valueMap, false);
+                break;
+            case OP_DELETE:
+                valueMap = extractDeleteRow(recordRoot);
+                addDeleteSign(valueMap, true);
+                break;
+            default:
+                LOG.error("parse record fail, unknown op {} in {}", op, record);
+                return null;
+        }
+
+        return DorisRecord.of(
+                dorisTableIdentifier,
+                objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
+    }
+
+    public String getCdcTableIdentifier(JsonNode record) {
+        if (record.get(FIELD_NAMESPACE) == null
+                || record.get(FIELD_NAMESPACE) instanceof NullNode) {
+            LOG.error("Failed to get cdc namespace");
+            throw new RuntimeException();
+        }
+        JsonNode nameSpace = record.get(FIELD_NAMESPACE);
+        String db = extractJsonNode(nameSpace, FIELD_DATABASE);
+        String table = extractJsonNode(nameSpace, FIELD_TABLE);
+        return SourceSchema.getString(db, null, table);
+    }
+
+    @Override
+    public Map<String, Object> extractBeforeRow(JsonNode record) {
+        return null;
+    }
+
+    @Override
+    public Map<String, Object> extractAfterRow(JsonNode recordRoot) {
+        JsonNode dataNode = recordRoot.get(FIELD_DATA);
+        Map<String, Object> rowMap = extractRow(dataNode);
+        String objectId = ((Map<?, ?>) rowMap.get(ID_FIELD)).get(OID_FIELD).toString();
+        rowMap.put(ID_FIELD, objectId);
+        return rowMap;
+    }
+
+    private Map<String, Object> extractDeleteRow(JsonNode recordRoot)
+            throws JsonProcessingException {
+        String documentKey = extractJsonNode(recordRoot, FIELD_DOCUMENT_KEY);
+        JsonNode jsonNode = objectMapper.readTree(documentKey);
+        String objectId = extractJsonNode(jsonNode.get(ID_FIELD), OID_FIELD);
+        Map<String, Object> row = new HashMap<>();
+        row.put(ID_FIELD, objectId);
+        return row;
+    }
+
+    private Map<String, Object> extractRow(JsonNode recordRow) {
+        Map<String, Object> recordMap =
+                objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
+        return recordMap != null ? recordMap : new HashMap<>();
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumSchemaChange.java
new file mode 100644
index 0000000..69dbf0b
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumSchemaChange.java
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.mongodb.MongoDBType;
+import org.apache.doris.flink.tools.cdc.mongodb.MongoDateConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils.getDorisTableIdentifier;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DATE_FIELD;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DECIMAL_FIELD;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DATA;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DATABASE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_NAMESPACE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_TABLE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.LONG_FIELD;
+
+public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoJsonDebeziumSchemaChange.class);
+
+    private final ObjectMapper objectMapper;
+
+    private final Map<String, Map<String, String>> tableFields;
+
+    private final SchemaChangeManager schemaChangeManager;
+
+    private final DorisSystem dorisSystem;
+
+    public Map<String, String> tableMapping;
+    private final DorisOptions dorisOptions;
+
+    private final Set<String> specialFields =
+            new HashSet<>(Arrays.asList(DATE_FIELD, DECIMAL_FIELD, LONG_FIELD));
+
+    public MongoJsonDebeziumSchemaChange(JsonDebeziumChangeContext changeContext) {
+        this.objectMapper = changeContext.getObjectMapper();
+        this.dorisOptions = changeContext.getDorisOptions();
+        this.tableFields = new HashMap<>();
+        this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
+        this.dorisSystem = new DorisSystem(dorisOptions);
+        this.tableMapping = changeContext.getTableMapping();
+    }
+
+    @Override
+    public String extractDatabase(JsonNode record) {
+        return null;
+    }
+
+    @Override
+    public String extractTable(JsonNode record) {
+        return null;
+    }
+
+    @Override
+    public boolean schemaChange(JsonNode recordRoot) throws IOException {
+        JsonNode logData = getFullDocument(recordRoot);
+        String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+        String dorisTableIdentifier =
+                getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping);
+        String[] tableInfo = dorisTableIdentifier.split("\\.");
+        if (tableInfo.length != 2) {
+            throw new DorisRuntimeException();
+        }
+        String dataBase = tableInfo[0];
+        String table = tableInfo[1];
+        // build table fields mapping for all record
+        buildDorisTableFieldsMapping(dataBase, table);
+
+        // Determine whether change stream log and tableField are exactly the same, if not, perform
+        // schema change
+        checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase, table);
+        formatSpecialFieldData(logData);
+        ((ObjectNode) recordRoot).set(FIELD_DATA, logData);
+        return true;
+    }
+
+    private void formatSpecialFieldData(JsonNode logData) {
+        logData.fieldNames()
+                .forEachRemaining(
+                        fieldName -> {
+                            JsonNode fieldNode = logData.get(fieldName);
+                            if (fieldNode.isObject() && fieldNode.size() == 1) {
+                                String fieldKey = fieldNode.fieldNames().next();
+                                if (specialFields.contains(fieldKey)) {
+                                    switch (fieldKey) {
+                                        case DATE_FIELD:
+                                            long timestamp = fieldNode.get(DATE_FIELD).asLong();
+                                            String formattedDate =
+                                                    MongoDateConverter.convertTimestampToString(
+                                                            timestamp);
+                                            ((ObjectNode) logData).put(fieldName, formattedDate);
+                                            break;
+                                        case DECIMAL_FIELD:
+                                            String numberDecimal =
+                                                    fieldNode.get(DECIMAL_FIELD).asText();
+                                            ((ObjectNode) logData).put(fieldName, numberDecimal);
+                                            break;
+
+                                        case LONG_FIELD:
+                                            long longFiled = fieldNode.get(LONG_FIELD).asLong();
+                                            ((ObjectNode) logData).put(fieldName, longFiled);
+                                            break;
+                                    }
+                                }
+                            }
+                        });
+    }
+
+    private JsonNode getFullDocument(JsonNode recordRoot) {
+        try {
+            return objectMapper.readTree(recordRoot.get(FIELD_DATA).asText());
+        } catch (IOException e) {
+            throw new DorisRuntimeException("Failed to parse fullDocument JSON", e);
+        }
+    }
+
+    private void checkAndUpdateSchemaChange(
+            JsonNode logData, String dorisTableIdentifier, String database, String table) {
+        Map<String, String> tableFieldMap = tableFields.get(dorisTableIdentifier);
+        logData.fieldNames()
+                .forEachRemaining(
+                        name -> {
+                            try {
+                                if (!tableFieldMap.containsKey(name)) {
+                                    doSchemaChange(name, logData, database, table);
+                                }
+                            } catch (Exception e) {
+                                throw new RuntimeException("Error during schema change", e);
+                            }
+                        });
+    }
+
+    private void doSchemaChange(
+            String logFieldName, JsonNode logData, String database, String table)
+            throws IOException, IllegalArgumentException {
+        String dorisType = MongoDBType.jsonNodeToDorisType(logData.get(logFieldName));
+        schemaChangeManager.addColumn(
+                database, table, new FieldSchema(logFieldName, dorisType, null));
+        String identifier = database + "." + table;
+        tableFields.computeIfAbsent(identifier, k -> new HashMap<>()).put(logFieldName, dorisType);
+    }
+
+    private void buildDorisTableFieldsMapping(String databaseName, String tableName) {
+        String identifier = databaseName + "." + tableName;
+        tableFields.computeIfAbsent(
+                identifier, k -> dorisSystem.getTableFieldNames(databaseName, tableName));
+    }
+
+    @Override
+    public String getCdcTableIdentifier(JsonNode record) {
+        if (record.get(FIELD_NAMESPACE) == null
+                || record.get(FIELD_NAMESPACE) instanceof NullNode) {
+            LOG.error("Failed to get cdc namespace");
+            throw new RuntimeException();
+        }
+        JsonNode nameSpace = record.get(FIELD_NAMESPACE);
+        String table = nameSpace.get(FIELD_TABLE).asText();
+        String db = nameSpace.get(FIELD_DATABASE).asText();
+        return SourceSchema.getString(db, null, table);
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index e1a01ab..194ef87 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -23,6 +23,7 @@
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
+import org.apache.doris.flink.tools.cdc.mongodb.MongoDBDatabaseSync;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
 import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
 import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
@@ -40,6 +41,7 @@
     private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database";
     private static final String POSTGRES_SYNC_DATABASE = "postgres-sync-database";
     private static final String SQLSERVER_SYNC_DATABASE = "sqlserver-sync-database";
+    private static final String MONGODB_SYNC_DATABASE = "mongodb-sync-database";
     private static final List<String> EMPTY_KEYS = Collections.singletonList("password");
 
     public static void main(String[] args) throws Exception {
@@ -59,6 +61,9 @@
             case SQLSERVER_SYNC_DATABASE:
                 createSqlServerSyncDatabase(opArgs);
                 break;
+            case MONGODB_SYNC_DATABASE:
+                createMongoDBSyncDatabase(opArgs);
+                break;
             default:
                 System.out.println("Unknown operation " + operation);
                 System.exit(1);
@@ -101,6 +106,15 @@
         syncDatabase(params, databaseSync, postgresConfig, "SqlServer");
     }
 
+    private static void createMongoDBSyncDatabase(String[] opArgs) throws Exception {
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+        Preconditions.checkArgument(params.has("mongodb-conf"));
+        Map<String, String> mongoMap = getConfigMap(params, "mongodb-conf");
+        Configuration mongoConfig = Configuration.fromMap(mongoMap);
+        DatabaseSync databaseSync = new MongoDBDatabaseSync();
+        syncDatabase(params, databaseSync, mongoConfig, "mongodb");
+    }
+
     private static void syncDatabase(
             MultipleParameterTool params,
             DatabaseSync databaseSync,
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 14d3fbb..632edcc 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -82,7 +82,7 @@
     protected String tablePrefix;
     protected String tableSuffix;
     protected boolean singleSink;
-    private final Map<String, String> tableMapping = new HashMap<>();
+    protected final Map<String, String> tableMapping = new HashMap<>();
 
     public abstract void registerDriver() throws SQLException;
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java
new file mode 100644
index 0000000..f8772c9
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import java.io.Serializable;
+
+public class ChangeStreamConstant implements Serializable {
+    private static final long serialVersionUID = 2599456667907755804L;
+    public static final String ID_FIELD = "_id";
+    public static final String OID_FIELD = "$oid";
+    public static final String FIELD_TYPE = "operationType";
+    public static final String FIELD_DATA = "fullDocument";
+    public static final String OP_UPDATE = "update";
+    public static final String OP_INSERT = "insert";
+    public static final String OP_REPLACE = "replace";
+    public static final String OP_DELETE = "delete";
+    public static final String FIELD_DATABASE = "db";
+    public static final String FIELD_TABLE = "coll";
+    public static final String FIELD_NAMESPACE = "ns";
+    public static final String FIELD_DOCUMENT_KEY = "documentKey";
+
+    public static final String DATE_FIELD = "$date";
+
+    public static final String DECIMAL_FIELD = "$numberDecimal";
+
+    public static final String LONG_FIELD = "$numberLong";
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
new file mode 100644
index 0000000..e2e0023
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import com.mongodb.ConnectionString;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
+import com.ververica.cdc.connectors.base.options.SourceOptions;
+import com.ververica.cdc.connectors.base.options.StartupOptions;
+import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
+import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
+import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
+import org.apache.doris.flink.sink.writer.serializer.MongoDBJsonDebeziumSchemaSerializer;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.doris.flink.tools.cdc.ParsingProcessFunction;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.bson.Document;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class MongoDBDatabaseSync extends DatabaseSync {
+
+    private static final String INITIAL_MODE = "initial";
+    private static final String LATEST_OFFSET_MODE = "latest-offset";
+    private static final String TIMESTAMP_MODE = "timestamp";
+    public static final ConfigOption<Double> MONGO_CDC_CREATE_SAMPLE_PERCENT =
+            ConfigOptions.key("schema.sample-percent")
+                    .doubleType()
+                    .defaultValue(0.2)
+                    .withDescription("mongo cdc sample percent");
+
+    public static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Table name of the Mongo database to monitor.");
+
+    public MongoDBDatabaseSync() throws SQLException {}
+
+    @Override
+    public void registerDriver() throws SQLException {}
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public List<SourceSchema> getSchemaList() throws Exception {
+        String databaseName = config.get(MongoDBSourceOptions.DATABASE);
+        List<SourceSchema> schemaList = new ArrayList<>();
+        MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();
+
+        settingsBuilder.applyConnectionString(
+                new ConnectionString(
+                        buildConnectionString(
+                                config.get(MongoDBSourceOptions.USERNAME),
+                                config.get(MongoDBSourceOptions.PASSWORD),
+                                config.get(MongoDBSourceOptions.SCHEME),
+                                config.get(MongoDBSourceOptions.HOSTS),
+                                config.get(MongoDBSourceOptions.CONNECTION_OPTIONS))));
+
+        MongoClientSettings settings = settingsBuilder.build();
+        Double samplePercent = config.get(MONGO_CDC_CREATE_SAMPLE_PERCENT);
+        try (MongoClient mongoClient = MongoClients.create(settings)) {
+            MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);
+            MongoIterable<String> collectionNames = mongoDatabase.listCollectionNames();
+            for (String collectionName : collectionNames) {
+                if (!isSyncNeeded(collectionName)) {
+                    continue;
+                }
+                MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName);
+                Document firstDocument = collection.find().first();
+                if (firstDocument == null) {
+                    throw new IllegalStateException("No documents in collection to infer schema");
+                }
+
+                long totalDocuments = collection.countDocuments();
+                long sampleSize = (long) Math.ceil(totalDocuments * samplePercent);
+                ArrayList<Document> documents = sampleData(collection, sampleSize);
+                MongoDBSchema mongoDBSchema =
+                        new MongoDBSchema(documents, databaseName, collectionName, null);
+                mongoDBSchema.setModel(DataModel.UNIQUE);
+                schemaList.add(mongoDBSchema);
+            }
+        }
+
+        return schemaList;
+    }
+
+    private ArrayList<Document> sampleData(MongoCollection<Document> collection, Long sampleNum) {
+        ArrayList<Document> query = new ArrayList<>();
+        query.add(new Document("$sample", new Document("size", sampleNum)));
+        return collection.aggregate(query).into(new ArrayList<>());
+    }
+
+    private static String buildConnectionString(
+            @Nullable String username,
+            @Nullable String password,
+            String scheme,
+            String hosts,
+            @Nullable String connectionOptions) {
+        StringBuilder sb = new StringBuilder(scheme).append("://");
+        if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
+            sb.append(encodeValue(username)).append(":").append(encodeValue(password)).append("@");
+        }
+        sb.append(checkNotNull(hosts));
+        if (StringUtils.isNotEmpty(connectionOptions)) {
+            sb.append("/?").append(connectionOptions);
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
+        String hosts = config.get(MongoDBSourceOptions.HOSTS);
+        String username = config.get(MongoDBSourceOptions.USERNAME);
+        String password = config.get(MongoDBSourceOptions.PASSWORD);
+        String database = config.get(MongoDBSourceOptions.DATABASE);
+        String collection = config.get(MongoDBSourceOptions.COLLECTION);
+        if (StringUtils.isBlank(collection)) {
+            collection = config.get(TABLE_NAME);
+        }
+        MongoDBSourceBuilder<String> mongoDBSourceBuilder = MongoDBSource.builder();
+        Map<String, Object> customConverterConfigs = new HashMap<>();
+        customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
+        JsonDebeziumDeserializationSchema schema =
+                new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
+
+        mongoDBSourceBuilder
+                .hosts(hosts)
+                .username(username)
+                .password(password)
+                .databaseList(database)
+                .collectionList(collection);
+
+        String startupMode = config.get(SourceOptions.SCAN_STARTUP_MODE);
+        switch (startupMode.toLowerCase()) {
+            case INITIAL_MODE:
+                mongoDBSourceBuilder.startupOptions(StartupOptions.initial());
+                break;
+            case LATEST_OFFSET_MODE:
+                mongoDBSourceBuilder.startupOptions(StartupOptions.latest());
+                break;
+            case TIMESTAMP_MODE:
+                mongoDBSourceBuilder.startupOptions(
+                        StartupOptions.timestamp(
+                                config.get(SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported startup mode: " + startupMode);
+        }
+        MongoDBSource<String> mongoDBSource = mongoDBSourceBuilder.deserializer(schema).build();
+        return env.fromSource(mongoDBSource, WatermarkStrategy.noWatermarks(), "MongoDB Source");
+    }
+
+    @Override
+    public ParsingProcessFunction buildProcessFunction() {
+        return new MongoParsingProcessFunction(converter);
+    }
+
+    @Override
+    public DorisRecordSerializer<String> buildSchemaSerializer(
+            DorisOptions.Builder dorisBuilder, DorisExecutionOptions executionOptions) {
+        return MongoDBJsonDebeziumSchemaSerializer.builder()
+                .setDorisOptions(dorisBuilder.build())
+                .setExecutionOptions(executionOptions)
+                .setTableMapping(tableMapping)
+                .setTableProperties(tableConfig)
+                .setTargetDatabase(database)
+                .build();
+    }
+
+    @Override
+    public String getTableListPrefix() {
+        return config.get(MongoDBSourceOptions.DATABASE);
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
new file mode 100644
index 0000000..2c2e1b4
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.bson.Document;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class MongoDBSchema extends SourceSchema {
+
+    public MongoDBSchema(
+            ArrayList<Document> sampleData,
+            String databaseName,
+            String tableName,
+            String tableComment)
+            throws Exception {
+        super(databaseName, null, tableName, tableComment);
+        fields = new LinkedHashMap<>();
+        for (Document data : sampleData) {
+            processSampleData(data);
+        }
+
+        primaryKeys = new ArrayList<>();
+        primaryKeys.add("_id");
+    }
+
+    private void processSampleData(Document sampleData) {
+        for (Map.Entry<String, Object> entry : sampleData.entrySet()) {
+            String fieldName = entry.getKey();
+            Object value = entry.getValue();
+            String dorisType = MongoDBType.toDorisType(value);
+            if (isDecimalField(fieldName)) {
+                dorisType = replaceDecimalTypeIfNeeded(fieldName, dorisType);
+            }
+            fields.put(fieldName, new FieldSchema(fieldName, dorisType, null));
+        }
+    }
+
+    private boolean isDecimalField(String fieldName) {
+        FieldSchema existingField = fields.get(fieldName);
+        return existingField != null && existingField.getTypeString().startsWith(DorisType.DECIMAL);
+    }
+
+    private String replaceDecimalTypeIfNeeded(String fieldName, String newDorisType) {
+        FieldSchema existingField = fields.get(fieldName);
+        if (existingField.getTypeString().startsWith(DorisType.DECIMAL)) {
+            Tuple2<Integer, Integer> existingPrecisionAndScale =
+                    MongoDBType.getDecimalPrecisionAndScale(existingField.getTypeString());
+            int existingPrecision = existingPrecisionAndScale.f0;
+            int existingScale = existingPrecisionAndScale.f1;
+
+            Tuple2<Integer, Integer> currentPrecisionAndScale =
+                    MongoDBType.getDecimalPrecisionAndScale(newDorisType);
+            int currentPrecision = currentPrecisionAndScale.f0;
+            int currentScale = currentPrecisionAndScale.f1;
+
+            int newScale = Math.max(existingScale, currentScale);
+            int newIntegerPartSize =
+                    Math.max(existingPrecision - existingScale, currentPrecision - currentScale);
+            int newPrecision = newIntegerPartSize + newScale;
+
+            return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + ")";
+        }
+        return newDorisType;
+    }
+
+    @Override
+    public String convertToDorisType(String fieldType, Integer precision, Integer scale) {
+        return null;
+    }
+
+    @Override
+    public String getCdcTableName() {
+        return databaseName + "\\." + tableName;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
new file mode 100644
index 0000000..bee85ce
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DecimalNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.IntNode;
+import com.fasterxml.jackson.databind.node.LongNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.bson.BsonArray;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class MongoDBType {
+
+    public static final String DATE_TYPE = "$date";
+    public static final String DECIMAL_TYPE = "$numberDecimal";
+    public static final String LONG_TYPE = "$numberLong";
+
+    public static String toDorisType(Object value) {
+        if (value instanceof Integer) {
+            return DorisType.INT;
+        } else if (value instanceof Date) {
+            return DorisType.DATETIME_V2 + "(3)";
+        } else if (value instanceof Long) {
+            return DorisType.BIGINT;
+        } else if (value instanceof Double) {
+            return checkAndRebuildBigDecimal(new BigDecimal(String.valueOf(value)));
+        } else if (value instanceof Boolean) {
+            return DorisType.BOOLEAN;
+        } else if (value instanceof String) {
+            return DorisType.STRING;
+        } else if (value instanceof ObjectId) {
+            return DorisType.VARCHAR + "(30)";
+        } else if (value instanceof BsonArray) {
+            return DorisType.ARRAY;
+        } else if (value instanceof Decimal128) {
+            return checkAndRebuildBigDecimal(((Decimal128) value).bigDecimalValue());
+        } else {
+            return DorisType.STRING;
+        }
+    }
+
+    public static String jsonNodeToDorisType(JsonNode value) {
+        if (value instanceof IntNode) {
+            return DorisType.INT;
+        } else if (value instanceof TextNode) {
+            return DorisType.STRING;
+        } else if (value instanceof LongNode) {
+            return DorisType.BIGINT;
+        } else if (value instanceof DoubleNode) {
+            return DorisType.DOUBLE;
+        } else if (value instanceof BooleanNode) {
+            return DorisType.BOOLEAN;
+        } else if (value instanceof ArrayNode) {
+            return DorisType.ARRAY;
+        } else if (value instanceof DecimalNode) {
+            return checkAndRebuildBigDecimal(value.decimalValue());
+        } else if (value instanceof ObjectNode) {
+            if (value.size() == 1 && value.get(DATE_TYPE) != null) {
+                return DorisType.DATETIME_V2 + "(3)";
+            } else if (value.size() == 1 && value.get(DECIMAL_TYPE) != null) {
+                return checkAndRebuildBigDecimal(new BigDecimal(value.get(DECIMAL_TYPE).asText()));
+            } else if (value.size() == 1 && value.get(LONG_TYPE) != null) {
+                return DorisType.BIGINT;
+            } else {
+                return DorisType.STRING;
+            }
+        } else {
+            return DorisType.STRING;
+        }
+    }
+
+    public static Tuple2<Integer, Integer> getDecimalPrecisionAndScale(String decimalString) {
+        // Simplified regular expression to match two numbers in brackets
+        String regex = "\\((\\d+),(\\d+)\\)";
+        Pattern pattern = Pattern.compile(regex);
+        Matcher matcher = pattern.matcher(decimalString);
+
+        if (matcher.find()) {
+            Integer precision = Integer.parseInt(matcher.group(1));
+            Integer scale = Integer.parseInt(matcher.group(2));
+            return new Tuple2<>(precision, scale);
+        }
+        throw new DorisRuntimeException("Get Decimal precision and Scale error !");
+    }
+
+    public static String checkAndRebuildBigDecimal(BigDecimal decimal) {
+        if (decimal.scale() < 0) {
+            decimal = new BigDecimal(decimal.toPlainString());
+        }
+        return decimal.precision() <= 38
+                ? String.format(
+                        "%s(%s,%s)",
+                        DorisType.DECIMAL_V3, decimal.precision(), Math.max(decimal.scale(), 0))
+                : DorisType.STRING;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java
new file mode 100644
index 0000000..b7a3ec7
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+public class MongoDateConverter {
+    private static final ThreadLocal<DateTimeFormatter> dateFormatterThreadLocal =
+            ThreadLocal.withInitial(
+                    () -> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"));
+
+    public static String convertTimestampToString(long timestamp) {
+        Instant instant = Instant.ofEpochMilli(timestamp);
+        LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("Asia/Shanghai"));
+        return dateFormatterThreadLocal.get().format(localDateTime);
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
new file mode 100644
index 0000000..737617a
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.tools.cdc.DatabaseSync.TableNameConverter;
+import org.apache.doris.flink.tools.cdc.ParsingProcessFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MongoParsingProcessFunction extends ParsingProcessFunction {
+    private static final Logger LOG = LoggerFactory.getLogger(MongoParsingProcessFunction.class);
+
+    public MongoParsingProcessFunction(TableNameConverter converter) {
+        super(converter);
+    }
+
+    @Override
+    protected String getRecordTableName(String record) throws Exception {
+        JsonNode jsonNode = objectMapper.readValue(record, JsonNode.class);
+        if (jsonNode.get("ns") == null || jsonNode.get("ns") instanceof NullNode) {
+            LOG.error("Failed to get cdc namespace");
+            throw new RuntimeException();
+        }
+        JsonNode nameSpace = jsonNode.get("ns");
+        return extractJsonNode(nameSpace, "coll");
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java
new file mode 100644
index 0000000..ef66e88
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.doris.flink.tools.cdc.mongodb.MongoDBDatabaseSync;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcMongoSyncDatabaseCase {
+    public static void main(String[] args) throws Exception {
+        Configuration conf = new Configuration();
+        //        conf.setString(RestOptions.BIND_PORT, "8018");
+        //        conf.setString("rest.flamegraph.enabled", "true");
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        //        StreamExecutionEnvironment env =
+        //                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
+        Map<String, String> flinkMap = new HashMap<>();
+        flinkMap.put("execution.checkpointing.interval", "10s");
+        flinkMap.put("pipeline.operator-chaining", "false");
+        flinkMap.put("parallelism.default", "8");
+
+        String database = "cdc_test";
+        String tablePrefix = "";
+        String tableSuffix = "";
+
+        Configuration configuration = Configuration.fromMap(flinkMap);
+        env.configure(configuration);
+        Map<String, String> mongoConfig = new HashMap<>();
+        mongoConfig.put("database", "test");
+        mongoConfig.put("hosts", "127.0.0.1:27017");
+        mongoConfig.put("username", "flinkuser");
+        // mysqlConfig.put("password","");
+        mongoConfig.put("password", "flinkpwd");
+        //                        mongoConfig.put("scan.startup.mode", "latest-offset");
+        mongoConfig.put("scan.startup.mode", "initial");
+        mongoConfig.put("schema.sample-percent", "1");
+        Configuration config = Configuration.fromMap(mongoConfig);
+
+        Map<String, String> sinkConfig = new HashMap<>();
+        sinkConfig.put("fenodes", "127.0.0.1:8030");
+        // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040");
+        sinkConfig.put("username", "root");
+        sinkConfig.put("password", "");
+        sinkConfig.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
+        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+        sinkConfig.put("auto-redirect", "false");
+        //        sinkConfig.put("sink.enable.batch-mode","true");
+        //        sinkConfig.put("sink.write-mode","stream_load_batch");
+        Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("replication_num", "1");
+        tableConfig.put("table-buckets", ".*:1");
+        String includingTables = "cdc_test";
+        //        String includingTables = "a_.*|b_.*|c";
+        String excludingTables = "";
+        String multiToOneOrigin = "a_.*|b_.*";
+        String multiToOneTarget = "a|b";
+        boolean ignoreDefaultValue = false;
+        //        boolean useNewSchemaChange = false;
+        DatabaseSync databaseSync = new MongoDBDatabaseSync();
+        databaseSync
+                .setEnv(env)
+                .setDatabase(database)
+                .setConfig(config)
+                .setTablePrefix(tablePrefix)
+                .setTableSuffix(tableSuffix)
+                .setIncludingTables(includingTables)
+                .setExcludingTables(excludingTables)
+                .setMultiToOneOrigin(multiToOneOrigin)
+                .setMultiToOneTarget(multiToOneTarget)
+                .setIgnoreDefaultValue(ignoreDefaultValue)
+                .setSinkConfig(sinkConf)
+                .setTableConfig(tableConfig)
+                .setCreateTableOnly(false)
+                //                .setSingleSink(true)
+                //                .setNewSchemaChange(useNewSchemaChange)
+                .create();
+        databaseSync.build();
+        env.execute(String.format("Mongo-Doris Database Sync: %s", database));
+    }
+}