[Feature](cdc) add MongoDB cdc (#343)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index beaa5b9..23145e4 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -285,6 +285,19 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-sql-connector-mongodb-cdc</artifactId>
+ <!-- the dependency is available only for stable releases. -->
+ <version>${flink.sql.cdc.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>flink-shaded-guava</artifactId>
+ <groupId>org.apache.flink</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 86745fa..3608e95 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -36,6 +36,7 @@
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -241,6 +242,32 @@
return sb.toString();
}
+ public Map<String, String> getTableFieldNames(String databaseName, String tableName) {
+ if (!databaseExists(databaseName)) {
+ throw new DorisRuntimeException("database" + databaseName + " is not exists");
+ }
+ String sql =
+ String.format(
+ "SELECT COLUMN_NAME,DATA_TYPE "
+ + "FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= '%s'",
+ databaseName, tableName);
+
+ Map<String, String> columnValues = new HashMap<>();
+ try (PreparedStatement ps =
+ jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
+ ResultSet rs = ps.executeQuery();
+ while (rs.next()) {
+ String filedName = rs.getString(1);
+ String datatype = rs.getString(2);
+ columnValues.put(filedName, datatype);
+ }
+ return columnValues;
+ } catch (Exception e) {
+ throw new DorisSystemException(
+ String.format("The following SQL query could not be executed: %s", sql), e);
+ }
+ }
+
private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) {
String fieldType = field.getTypeString();
if (isKey && DorisType.STRING.equals(fieldType)) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/MongoDBJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/MongoDBJsonDebeziumSchemaSerializer.java
new file mode 100644
index 0000000..b2ff598
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/MongoDBJsonDebeziumSchemaSerializer.java
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.MongoJsonDebeziumDataChange;
+import org.apache.doris.flink.sink.writer.serializer.jsondebezium.MongoJsonDebeziumSchemaChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
+import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+
+public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MongoDBJsonDebeziumSchemaSerializer.class);
+ private final Pattern pattern;
+ private final DorisOptions dorisOptions;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ // table name of the cdc upstream, format is db.tbl
+ private final String sourceTableName;
+ private String lineDelimiter = LINE_DELIMITER_DEFAULT;
+ private boolean ignoreUpdateBefore = true;
+ // <cdc db.schema.table, doris db.table>
+ private Map<String, String> tableMapping;
+ // create table properties
+ private Map<String, String> tableProperties;
+ private String targetDatabase;
+
+ private CdcDataChange dataChange;
+ private CdcSchemaChange schemaChange;
+
+ private String targetTablePrefix;
+ private String targetTableSuffix;
+
+ public MongoDBJsonDebeziumSchemaSerializer(
+ DorisOptions dorisOptions,
+ Pattern pattern,
+ String sourceTableName,
+ DorisExecutionOptions executionOptions,
+ Map<String, String> tableMapping,
+ Map<String, String> tableProperties,
+ String targetDatabase,
+ String targetTablePrefix,
+ String targetTableSuffix) {
+ this.dorisOptions = dorisOptions;
+ this.pattern = pattern;
+ this.sourceTableName = sourceTableName;
+ // Prevent loss of decimal data precision
+ this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
+ JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
+ this.objectMapper.setNodeFactory(jsonNodeFactory);
+ this.tableMapping = tableMapping;
+ this.tableProperties = tableProperties;
+ this.targetDatabase = targetDatabase;
+ this.targetTablePrefix = targetTablePrefix;
+ this.targetTableSuffix = targetTableSuffix;
+ if (executionOptions != null) {
+ this.lineDelimiter =
+ executionOptions
+ .getStreamLoadProp()
+ .getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
+ this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
+ }
+ init();
+ }
+
+ private void init() {
+ JsonDebeziumChangeContext changeContext =
+ new JsonDebeziumChangeContext(
+ dorisOptions,
+ tableMapping,
+ sourceTableName,
+ targetDatabase,
+ tableProperties,
+ objectMapper,
+ pattern,
+ lineDelimiter,
+ ignoreUpdateBefore,
+ targetTablePrefix,
+ targetTableSuffix);
+ this.dataChange = new MongoJsonDebeziumDataChange(changeContext);
+ this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
+ }
+
+ @Override
+ public DorisRecord serialize(String record) throws IOException {
+ LOG.debug("received debezium json data {} :", record);
+ JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+ String op = getOperateType(recordRoot);
+ try {
+ schemaChange.schemaChange(recordRoot);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return dataChange.serialize(record, recordRoot, op);
+ }
+
+ private String getOperateType(JsonNode recordRoot) {
+ return recordRoot.get("operationType").asText();
+ }
+
+ public static MongoDBJsonDebeziumSchemaSerializer.Builder builder() {
+ return new MongoDBJsonDebeziumSchemaSerializer.Builder();
+ }
+
+ public static class Builder {
+ private DorisOptions dorisOptions;
+ private Pattern addDropDDLPattern;
+ private String sourceTableName;
+ private DorisExecutionOptions executionOptions;
+ private Map<String, String> tableMapping;
+ private Map<String, String> tableProperties;
+ private String targetDatabase;
+ private String targetTablePrefix = "";
+ private String targetTableSuffix = "";
+
+ public MongoDBJsonDebeziumSchemaSerializer.Builder setDorisOptions(
+ DorisOptions dorisOptions) {
+ this.dorisOptions = dorisOptions;
+ return this;
+ }
+
+ public MongoDBJsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) {
+ this.addDropDDLPattern = addDropDDLPattern;
+ return this;
+ }
+
+ public MongoDBJsonDebeziumSchemaSerializer.Builder setSourceTableName(
+ String sourceTableName) {
+ this.sourceTableName = sourceTableName;
+ return this;
+ }
+
+ public MongoDBJsonDebeziumSchemaSerializer.Builder setExecutionOptions(
+ DorisExecutionOptions executionOptions) {
+ this.executionOptions = executionOptions;
+ return this;
+ }
+
+ public MongoDBJsonDebeziumSchemaSerializer.Builder setTableMapping(
+ Map<String, String> tableMapping) {
+ this.tableMapping = tableMapping;
+ return this;
+ }
+
+ public MongoDBJsonDebeziumSchemaSerializer.Builder setTableProperties(
+ Map<String, String> tableProperties) {
+ this.tableProperties = tableProperties;
+ return this;
+ }
+
+ public MongoDBJsonDebeziumSchemaSerializer.Builder setTargetDatabase(
+ String targetDatabase) {
+ this.targetDatabase = targetDatabase;
+ return this;
+ }
+
+ public MongoDBJsonDebeziumSchemaSerializer build() {
+ return new MongoDBJsonDebeziumSchemaSerializer(
+ dorisOptions,
+ addDropDDLPattern,
+ sourceTableName,
+ executionOptions,
+ tableMapping,
+ tableProperties,
+ targetDatabase,
+ targetTablePrefix,
+ targetTableSuffix);
+ }
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java
index c344aae..cba431c 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java
@@ -31,7 +31,7 @@
*/
public abstract class CdcDataChange implements ChangeEvent {
- protected abstract DorisRecord serialize(String record, JsonNode recordRoot, String op)
+ public abstract DorisRecord serialize(String record, JsonNode recordRoot, String op)
throws IOException;
protected abstract Map<String, Object> extractBeforeRow(JsonNode record);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumDataChange.java
new file mode 100644
index 0000000..407a7e7
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumDataChange.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.writer.ChangeEvent;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
+import static org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils.extractJsonNode;
+import static org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils.getDorisTableIdentifier;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DATA;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DATABASE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DOCUMENT_KEY;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_NAMESPACE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_TABLE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.ID_FIELD;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OID_FIELD;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OP_DELETE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OP_INSERT;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OP_REPLACE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OP_UPDATE;
+
+public class MongoJsonDebeziumDataChange extends CdcDataChange implements ChangeEvent {
+ private static final Logger LOG = LoggerFactory.getLogger(MongoJsonDebeziumDataChange.class);
+
+ public DorisOptions dorisOptions;
+ public String lineDelimiter;
+ public JsonDebeziumChangeContext changeContext;
+ public ObjectMapper objectMapper;
+ public Map<String, String> tableMapping;
+
+ public MongoJsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) {
+ this.changeContext = changeContext;
+ this.dorisOptions = changeContext.getDorisOptions();
+ this.objectMapper = changeContext.getObjectMapper();
+ this.lineDelimiter = changeContext.getLineDelimiter();
+ this.tableMapping = changeContext.getTableMapping();
+ }
+
+ @Override
+ public DorisRecord serialize(String record, JsonNode recordRoot, String op) throws IOException {
+ // Filter out table records that are not in tableMapping
+ String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+ String dorisTableIdentifier =
+ getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping);
+ if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
+ LOG.warn(
+ "filter table {}, because it is not listened, record detail is {}",
+ cdcTableIdentifier,
+ record);
+ return null;
+ }
+ Map<String, Object> valueMap;
+ switch (op) {
+ case OP_INSERT:
+ case OP_UPDATE:
+ case OP_REPLACE:
+ valueMap = extractAfterRow(recordRoot);
+ addDeleteSign(valueMap, false);
+ break;
+ case OP_DELETE:
+ valueMap = extractDeleteRow(recordRoot);
+ addDeleteSign(valueMap, true);
+ break;
+ default:
+ LOG.error("parse record fail, unknown op {} in {}", op, record);
+ return null;
+ }
+
+ return DorisRecord.of(
+ dorisTableIdentifier,
+ objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8));
+ }
+
+ public String getCdcTableIdentifier(JsonNode record) {
+ if (record.get(FIELD_NAMESPACE) == null
+ || record.get(FIELD_NAMESPACE) instanceof NullNode) {
+ LOG.error("Failed to get cdc namespace");
+ throw new RuntimeException();
+ }
+ JsonNode nameSpace = record.get(FIELD_NAMESPACE);
+ String db = extractJsonNode(nameSpace, FIELD_DATABASE);
+ String table = extractJsonNode(nameSpace, FIELD_TABLE);
+ return SourceSchema.getString(db, null, table);
+ }
+
+ @Override
+ public Map<String, Object> extractBeforeRow(JsonNode record) {
+ return null;
+ }
+
+ @Override
+ public Map<String, Object> extractAfterRow(JsonNode recordRoot) {
+ JsonNode dataNode = recordRoot.get(FIELD_DATA);
+ Map<String, Object> rowMap = extractRow(dataNode);
+ String objectId = ((Map<?, ?>) rowMap.get(ID_FIELD)).get(OID_FIELD).toString();
+ rowMap.put(ID_FIELD, objectId);
+ return rowMap;
+ }
+
+ private Map<String, Object> extractDeleteRow(JsonNode recordRoot)
+ throws JsonProcessingException {
+ String documentKey = extractJsonNode(recordRoot, FIELD_DOCUMENT_KEY);
+ JsonNode jsonNode = objectMapper.readTree(documentKey);
+ String objectId = extractJsonNode(jsonNode.get(ID_FIELD), OID_FIELD);
+ Map<String, Object> row = new HashMap<>();
+ row.put(ID_FIELD, objectId);
+ return row;
+ }
+
+ private Map<String, Object> extractRow(JsonNode recordRow) {
+ Map<String, Object> recordMap =
+ objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
+ return recordMap != null ? recordMap : new HashMap<>();
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumSchemaChange.java
new file mode 100644
index 0000000..69dbf0b
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumSchemaChange.java
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.mongodb.MongoDBType;
+import org.apache.doris.flink.tools.cdc.mongodb.MongoDateConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils.getDorisTableIdentifier;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DATE_FIELD;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DECIMAL_FIELD;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DATA;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DATABASE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_NAMESPACE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_TABLE;
+import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.LONG_FIELD;
+
+public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MongoJsonDebeziumSchemaChange.class);
+
+ private final ObjectMapper objectMapper;
+
+ private final Map<String, Map<String, String>> tableFields;
+
+ private final SchemaChangeManager schemaChangeManager;
+
+ private final DorisSystem dorisSystem;
+
+ public Map<String, String> tableMapping;
+ private final DorisOptions dorisOptions;
+
+ private final Set<String> specialFields =
+ new HashSet<>(Arrays.asList(DATE_FIELD, DECIMAL_FIELD, LONG_FIELD));
+
+ public MongoJsonDebeziumSchemaChange(JsonDebeziumChangeContext changeContext) {
+ this.objectMapper = changeContext.getObjectMapper();
+ this.dorisOptions = changeContext.getDorisOptions();
+ this.tableFields = new HashMap<>();
+ this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
+ this.dorisSystem = new DorisSystem(dorisOptions);
+ this.tableMapping = changeContext.getTableMapping();
+ }
+
+ @Override
+ public String extractDatabase(JsonNode record) {
+ return null;
+ }
+
+ @Override
+ public String extractTable(JsonNode record) {
+ return null;
+ }
+
+ @Override
+ public boolean schemaChange(JsonNode recordRoot) throws IOException {
+ JsonNode logData = getFullDocument(recordRoot);
+ String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+ String dorisTableIdentifier =
+ getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping);
+ String[] tableInfo = dorisTableIdentifier.split("\\.");
+ if (tableInfo.length != 2) {
+ throw new DorisRuntimeException();
+ }
+ String dataBase = tableInfo[0];
+ String table = tableInfo[1];
+ // build table fields mapping for all record
+ buildDorisTableFieldsMapping(dataBase, table);
+
+ // Determine whether change stream log and tableField are exactly the same, if not, perform
+ // schema change
+ checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase, table);
+ formatSpecialFieldData(logData);
+ ((ObjectNode) recordRoot).set(FIELD_DATA, logData);
+ return true;
+ }
+
+ private void formatSpecialFieldData(JsonNode logData) {
+ logData.fieldNames()
+ .forEachRemaining(
+ fieldName -> {
+ JsonNode fieldNode = logData.get(fieldName);
+ if (fieldNode.isObject() && fieldNode.size() == 1) {
+ String fieldKey = fieldNode.fieldNames().next();
+ if (specialFields.contains(fieldKey)) {
+ switch (fieldKey) {
+ case DATE_FIELD:
+ long timestamp = fieldNode.get(DATE_FIELD).asLong();
+ String formattedDate =
+ MongoDateConverter.convertTimestampToString(
+ timestamp);
+ ((ObjectNode) logData).put(fieldName, formattedDate);
+ break;
+ case DECIMAL_FIELD:
+ String numberDecimal =
+ fieldNode.get(DECIMAL_FIELD).asText();
+ ((ObjectNode) logData).put(fieldName, numberDecimal);
+ break;
+
+ case LONG_FIELD:
+ long longFiled = fieldNode.get(LONG_FIELD).asLong();
+ ((ObjectNode) logData).put(fieldName, longFiled);
+ break;
+ }
+ }
+ }
+ });
+ }
+
+ private JsonNode getFullDocument(JsonNode recordRoot) {
+ try {
+ return objectMapper.readTree(recordRoot.get(FIELD_DATA).asText());
+ } catch (IOException e) {
+ throw new DorisRuntimeException("Failed to parse fullDocument JSON", e);
+ }
+ }
+
+ private void checkAndUpdateSchemaChange(
+ JsonNode logData, String dorisTableIdentifier, String database, String table) {
+ Map<String, String> tableFieldMap = tableFields.get(dorisTableIdentifier);
+ logData.fieldNames()
+ .forEachRemaining(
+ name -> {
+ try {
+ if (!tableFieldMap.containsKey(name)) {
+ doSchemaChange(name, logData, database, table);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Error during schema change", e);
+ }
+ });
+ }
+
+ private void doSchemaChange(
+ String logFieldName, JsonNode logData, String database, String table)
+ throws IOException, IllegalArgumentException {
+ String dorisType = MongoDBType.jsonNodeToDorisType(logData.get(logFieldName));
+ schemaChangeManager.addColumn(
+ database, table, new FieldSchema(logFieldName, dorisType, null));
+ String identifier = database + "." + table;
+ tableFields.computeIfAbsent(identifier, k -> new HashMap<>()).put(logFieldName, dorisType);
+ }
+
+ private void buildDorisTableFieldsMapping(String databaseName, String tableName) {
+ String identifier = databaseName + "." + tableName;
+ tableFields.computeIfAbsent(
+ identifier, k -> dorisSystem.getTableFieldNames(databaseName, tableName));
+ }
+
+ @Override
+ public String getCdcTableIdentifier(JsonNode record) {
+ if (record.get(FIELD_NAMESPACE) == null
+ || record.get(FIELD_NAMESPACE) instanceof NullNode) {
+ LOG.error("Failed to get cdc namespace");
+ throw new RuntimeException();
+ }
+ JsonNode nameSpace = record.get(FIELD_NAMESPACE);
+ String table = nameSpace.get(FIELD_TABLE).asText();
+ String db = nameSpace.get(FIELD_DATABASE).asText();
+ return SourceSchema.getString(db, null, table);
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index e1a01ab..194ef87 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -23,6 +23,7 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
+import org.apache.doris.flink.tools.cdc.mongodb.MongoDBDatabaseSync;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
@@ -40,6 +41,7 @@
private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database";
private static final String POSTGRES_SYNC_DATABASE = "postgres-sync-database";
private static final String SQLSERVER_SYNC_DATABASE = "sqlserver-sync-database";
+ private static final String MONGODB_SYNC_DATABASE = "mongodb-sync-database";
private static final List<String> EMPTY_KEYS = Collections.singletonList("password");
public static void main(String[] args) throws Exception {
@@ -59,6 +61,9 @@
case SQLSERVER_SYNC_DATABASE:
createSqlServerSyncDatabase(opArgs);
break;
+ case MONGODB_SYNC_DATABASE:
+ createMongoDBSyncDatabase(opArgs);
+ break;
default:
System.out.println("Unknown operation " + operation);
System.exit(1);
@@ -101,6 +106,15 @@
syncDatabase(params, databaseSync, postgresConfig, "SqlServer");
}
+ private static void createMongoDBSyncDatabase(String[] opArgs) throws Exception {
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ Preconditions.checkArgument(params.has("mongodb-conf"));
+ Map<String, String> mongoMap = getConfigMap(params, "mongodb-conf");
+ Configuration mongoConfig = Configuration.fromMap(mongoMap);
+ DatabaseSync databaseSync = new MongoDBDatabaseSync();
+ syncDatabase(params, databaseSync, mongoConfig, "mongodb");
+ }
+
private static void syncDatabase(
MultipleParameterTool params,
DatabaseSync databaseSync,
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 14d3fbb..632edcc 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -82,7 +82,7 @@
protected String tablePrefix;
protected String tableSuffix;
protected boolean singleSink;
- private final Map<String, String> tableMapping = new HashMap<>();
+ protected final Map<String, String> tableMapping = new HashMap<>();
public abstract void registerDriver() throws SQLException;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java
new file mode 100644
index 0000000..f8772c9
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import java.io.Serializable;
+
+public class ChangeStreamConstant implements Serializable {
+ private static final long serialVersionUID = 2599456667907755804L;
+ public static final String ID_FIELD = "_id";
+ public static final String OID_FIELD = "$oid";
+ public static final String FIELD_TYPE = "operationType";
+ public static final String FIELD_DATA = "fullDocument";
+ public static final String OP_UPDATE = "update";
+ public static final String OP_INSERT = "insert";
+ public static final String OP_REPLACE = "replace";
+ public static final String OP_DELETE = "delete";
+ public static final String FIELD_DATABASE = "db";
+ public static final String FIELD_TABLE = "coll";
+ public static final String FIELD_NAMESPACE = "ns";
+ public static final String FIELD_DOCUMENT_KEY = "documentKey";
+
+ public static final String DATE_FIELD = "$date";
+
+ public static final String DECIMAL_FIELD = "$numberDecimal";
+
+ public static final String LONG_FIELD = "$numberLong";
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
new file mode 100644
index 0000000..e2e0023
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import com.mongodb.ConnectionString;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
+import com.ververica.cdc.connectors.base.options.SourceOptions;
+import com.ververica.cdc.connectors.base.options.StartupOptions;
+import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
+import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
+import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
+import org.apache.doris.flink.sink.writer.serializer.MongoDBJsonDebeziumSchemaSerializer;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.doris.flink.tools.cdc.ParsingProcessFunction;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.bson.Document;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class MongoDBDatabaseSync extends DatabaseSync {
+
+ private static final String INITIAL_MODE = "initial";
+ private static final String LATEST_OFFSET_MODE = "latest-offset";
+ private static final String TIMESTAMP_MODE = "timestamp";
+ public static final ConfigOption<Double> MONGO_CDC_CREATE_SAMPLE_PERCENT =
+ ConfigOptions.key("schema.sample-percent")
+ .doubleType()
+ .defaultValue(0.2)
+ .withDescription("mongo cdc sample percent");
+
+ public static final ConfigOption<String> TABLE_NAME =
+ ConfigOptions.key("table-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Table name of the Mongo database to monitor.");
+
+ public MongoDBDatabaseSync() throws SQLException {}
+
+ @Override
+ public void registerDriver() throws SQLException {}
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public List<SourceSchema> getSchemaList() throws Exception {
+ String databaseName = config.get(MongoDBSourceOptions.DATABASE);
+ List<SourceSchema> schemaList = new ArrayList<>();
+ MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();
+
+ settingsBuilder.applyConnectionString(
+ new ConnectionString(
+ buildConnectionString(
+ config.get(MongoDBSourceOptions.USERNAME),
+ config.get(MongoDBSourceOptions.PASSWORD),
+ config.get(MongoDBSourceOptions.SCHEME),
+ config.get(MongoDBSourceOptions.HOSTS),
+ config.get(MongoDBSourceOptions.CONNECTION_OPTIONS))));
+
+ MongoClientSettings settings = settingsBuilder.build();
+ Double samplePercent = config.get(MONGO_CDC_CREATE_SAMPLE_PERCENT);
+ try (MongoClient mongoClient = MongoClients.create(settings)) {
+ MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);
+ MongoIterable<String> collectionNames = mongoDatabase.listCollectionNames();
+ for (String collectionName : collectionNames) {
+ if (!isSyncNeeded(collectionName)) {
+ continue;
+ }
+ MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName);
+ Document firstDocument = collection.find().first();
+ if (firstDocument == null) {
+ throw new IllegalStateException("No documents in collection to infer schema");
+ }
+
+ long totalDocuments = collection.countDocuments();
+ long sampleSize = (long) Math.ceil(totalDocuments * samplePercent);
+ ArrayList<Document> documents = sampleData(collection, sampleSize);
+ MongoDBSchema mongoDBSchema =
+ new MongoDBSchema(documents, databaseName, collectionName, null);
+ mongoDBSchema.setModel(DataModel.UNIQUE);
+ schemaList.add(mongoDBSchema);
+ }
+ }
+
+ return schemaList;
+ }
+
+ private ArrayList<Document> sampleData(MongoCollection<Document> collection, Long sampleNum) {
+ ArrayList<Document> query = new ArrayList<>();
+ query.add(new Document("$sample", new Document("size", sampleNum)));
+ return collection.aggregate(query).into(new ArrayList<>());
+ }
+
+ private static String buildConnectionString(
+ @Nullable String username,
+ @Nullable String password,
+ String scheme,
+ String hosts,
+ @Nullable String connectionOptions) {
+ StringBuilder sb = new StringBuilder(scheme).append("://");
+ if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
+ sb.append(encodeValue(username)).append(":").append(encodeValue(password)).append("@");
+ }
+ sb.append(checkNotNull(hosts));
+ if (StringUtils.isNotEmpty(connectionOptions)) {
+ sb.append("/?").append(connectionOptions);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
+ String hosts = config.get(MongoDBSourceOptions.HOSTS);
+ String username = config.get(MongoDBSourceOptions.USERNAME);
+ String password = config.get(MongoDBSourceOptions.PASSWORD);
+ String database = config.get(MongoDBSourceOptions.DATABASE);
+ String collection = config.get(MongoDBSourceOptions.COLLECTION);
+ if (StringUtils.isBlank(collection)) {
+ collection = config.get(TABLE_NAME);
+ }
+ MongoDBSourceBuilder<String> mongoDBSourceBuilder = MongoDBSource.builder();
+ Map<String, Object> customConverterConfigs = new HashMap<>();
+ customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
+ JsonDebeziumDeserializationSchema schema =
+ new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
+
+ mongoDBSourceBuilder
+ .hosts(hosts)
+ .username(username)
+ .password(password)
+ .databaseList(database)
+ .collectionList(collection);
+
+ String startupMode = config.get(SourceOptions.SCAN_STARTUP_MODE);
+ switch (startupMode.toLowerCase()) {
+ case INITIAL_MODE:
+ mongoDBSourceBuilder.startupOptions(StartupOptions.initial());
+ break;
+ case LATEST_OFFSET_MODE:
+ mongoDBSourceBuilder.startupOptions(StartupOptions.latest());
+ break;
+ case TIMESTAMP_MODE:
+ mongoDBSourceBuilder.startupOptions(
+ StartupOptions.timestamp(
+ config.get(SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported startup mode: " + startupMode);
+ }
+ MongoDBSource<String> mongoDBSource = mongoDBSourceBuilder.deserializer(schema).build();
+ return env.fromSource(mongoDBSource, WatermarkStrategy.noWatermarks(), "MongoDB Source");
+ }
+
+ @Override
+ public ParsingProcessFunction buildProcessFunction() {
+ return new MongoParsingProcessFunction(converter);
+ }
+
+ @Override
+ public DorisRecordSerializer<String> buildSchemaSerializer(
+ DorisOptions.Builder dorisBuilder, DorisExecutionOptions executionOptions) {
+ return MongoDBJsonDebeziumSchemaSerializer.builder()
+ .setDorisOptions(dorisBuilder.build())
+ .setExecutionOptions(executionOptions)
+ .setTableMapping(tableMapping)
+ .setTableProperties(tableConfig)
+ .setTargetDatabase(database)
+ .build();
+ }
+
+ @Override
+ public String getTableListPrefix() {
+ return config.get(MongoDBSourceOptions.DATABASE);
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
new file mode 100644
index 0000000..2c2e1b4
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.bson.Document;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class MongoDBSchema extends SourceSchema {
+
+ public MongoDBSchema(
+ ArrayList<Document> sampleData,
+ String databaseName,
+ String tableName,
+ String tableComment)
+ throws Exception {
+ super(databaseName, null, tableName, tableComment);
+ fields = new LinkedHashMap<>();
+ for (Document data : sampleData) {
+ processSampleData(data);
+ }
+
+ primaryKeys = new ArrayList<>();
+ primaryKeys.add("_id");
+ }
+
+ private void processSampleData(Document sampleData) {
+ for (Map.Entry<String, Object> entry : sampleData.entrySet()) {
+ String fieldName = entry.getKey();
+ Object value = entry.getValue();
+ String dorisType = MongoDBType.toDorisType(value);
+ if (isDecimalField(fieldName)) {
+ dorisType = replaceDecimalTypeIfNeeded(fieldName, dorisType);
+ }
+ fields.put(fieldName, new FieldSchema(fieldName, dorisType, null));
+ }
+ }
+
+ private boolean isDecimalField(String fieldName) {
+ FieldSchema existingField = fields.get(fieldName);
+ return existingField != null && existingField.getTypeString().startsWith(DorisType.DECIMAL);
+ }
+
+ private String replaceDecimalTypeIfNeeded(String fieldName, String newDorisType) {
+ FieldSchema existingField = fields.get(fieldName);
+ if (existingField.getTypeString().startsWith(DorisType.DECIMAL)) {
+ Tuple2<Integer, Integer> existingPrecisionAndScale =
+ MongoDBType.getDecimalPrecisionAndScale(existingField.getTypeString());
+ int existingPrecision = existingPrecisionAndScale.f0;
+ int existingScale = existingPrecisionAndScale.f1;
+
+ Tuple2<Integer, Integer> currentPrecisionAndScale =
+ MongoDBType.getDecimalPrecisionAndScale(newDorisType);
+ int currentPrecision = currentPrecisionAndScale.f0;
+ int currentScale = currentPrecisionAndScale.f1;
+
+ int newScale = Math.max(existingScale, currentScale);
+ int newIntegerPartSize =
+ Math.max(existingPrecision - existingScale, currentPrecision - currentScale);
+ int newPrecision = newIntegerPartSize + newScale;
+
+ return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + ")";
+ }
+ return newDorisType;
+ }
+
+ @Override
+ public String convertToDorisType(String fieldType, Integer precision, Integer scale) {
+ return null;
+ }
+
+ @Override
+ public String getCdcTableName() {
+ return databaseName + "\\." + tableName;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
new file mode 100644
index 0000000..bee85ce
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DecimalNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.IntNode;
+import com.fasterxml.jackson.databind.node.LongNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.bson.BsonArray;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class MongoDBType {
+
+ public static final String DATE_TYPE = "$date";
+ public static final String DECIMAL_TYPE = "$numberDecimal";
+ public static final String LONG_TYPE = "$numberLong";
+
+ public static String toDorisType(Object value) {
+ if (value instanceof Integer) {
+ return DorisType.INT;
+ } else if (value instanceof Date) {
+ return DorisType.DATETIME_V2 + "(3)";
+ } else if (value instanceof Long) {
+ return DorisType.BIGINT;
+ } else if (value instanceof Double) {
+ return checkAndRebuildBigDecimal(new BigDecimal(String.valueOf(value)));
+ } else if (value instanceof Boolean) {
+ return DorisType.BOOLEAN;
+ } else if (value instanceof String) {
+ return DorisType.STRING;
+ } else if (value instanceof ObjectId) {
+ return DorisType.VARCHAR + "(30)";
+ } else if (value instanceof BsonArray) {
+ return DorisType.ARRAY;
+ } else if (value instanceof Decimal128) {
+ return checkAndRebuildBigDecimal(((Decimal128) value).bigDecimalValue());
+ } else {
+ return DorisType.STRING;
+ }
+ }
+
+ public static String jsonNodeToDorisType(JsonNode value) {
+ if (value instanceof IntNode) {
+ return DorisType.INT;
+ } else if (value instanceof TextNode) {
+ return DorisType.STRING;
+ } else if (value instanceof LongNode) {
+ return DorisType.BIGINT;
+ } else if (value instanceof DoubleNode) {
+ return DorisType.DOUBLE;
+ } else if (value instanceof BooleanNode) {
+ return DorisType.BOOLEAN;
+ } else if (value instanceof ArrayNode) {
+ return DorisType.ARRAY;
+ } else if (value instanceof DecimalNode) {
+ return checkAndRebuildBigDecimal(value.decimalValue());
+ } else if (value instanceof ObjectNode) {
+ if (value.size() == 1 && value.get(DATE_TYPE) != null) {
+ return DorisType.DATETIME_V2 + "(3)";
+ } else if (value.size() == 1 && value.get(DECIMAL_TYPE) != null) {
+ return checkAndRebuildBigDecimal(new BigDecimal(value.get(DECIMAL_TYPE).asText()));
+ } else if (value.size() == 1 && value.get(LONG_TYPE) != null) {
+ return DorisType.BIGINT;
+ } else {
+ return DorisType.STRING;
+ }
+ } else {
+ return DorisType.STRING;
+ }
+ }
+
+ public static Tuple2<Integer, Integer> getDecimalPrecisionAndScale(String decimalString) {
+ // Simplified regular expression to match two numbers in brackets
+ String regex = "\\((\\d+),(\\d+)\\)";
+ Pattern pattern = Pattern.compile(regex);
+ Matcher matcher = pattern.matcher(decimalString);
+
+ if (matcher.find()) {
+ Integer precision = Integer.parseInt(matcher.group(1));
+ Integer scale = Integer.parseInt(matcher.group(2));
+ return new Tuple2<>(precision, scale);
+ }
+ throw new DorisRuntimeException("Get Decimal precision and Scale error !");
+ }
+
+ public static String checkAndRebuildBigDecimal(BigDecimal decimal) {
+ if (decimal.scale() < 0) {
+ decimal = new BigDecimal(decimal.toPlainString());
+ }
+ return decimal.precision() <= 38
+ ? String.format(
+ "%s(%s,%s)",
+ DorisType.DECIMAL_V3, decimal.precision(), Math.max(decimal.scale(), 0))
+ : DorisType.STRING;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java
new file mode 100644
index 0000000..b7a3ec7
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+public class MongoDateConverter {
+ private static final ThreadLocal<DateTimeFormatter> dateFormatterThreadLocal =
+ ThreadLocal.withInitial(
+ () -> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"));
+
+ public static String convertTimestampToString(long timestamp) {
+ Instant instant = Instant.ofEpochMilli(timestamp);
+ LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("Asia/Shanghai"));
+ return dateFormatterThreadLocal.get().format(localDateTime);
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
new file mode 100644
index 0000000..737617a
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import org.apache.doris.flink.tools.cdc.DatabaseSync.TableNameConverter;
+import org.apache.doris.flink.tools.cdc.ParsingProcessFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MongoParsingProcessFunction extends ParsingProcessFunction {
+ private static final Logger LOG = LoggerFactory.getLogger(MongoParsingProcessFunction.class);
+
+ public MongoParsingProcessFunction(TableNameConverter converter) {
+ super(converter);
+ }
+
+ @Override
+ protected String getRecordTableName(String record) throws Exception {
+ JsonNode jsonNode = objectMapper.readValue(record, JsonNode.class);
+ if (jsonNode.get("ns") == null || jsonNode.get("ns") instanceof NullNode) {
+ LOG.error("Failed to get cdc namespace");
+ throw new RuntimeException();
+ }
+ JsonNode nameSpace = jsonNode.get("ns");
+ return extractJsonNode(nameSpace, "coll");
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java
new file mode 100644
index 0000000..ef66e88
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.doris.flink.tools.cdc.mongodb.MongoDBDatabaseSync;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcMongoSyncDatabaseCase {
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ // conf.setString(RestOptions.BIND_PORT, "8018");
+ // conf.setString("rest.flamegraph.enabled", "true");
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ // StreamExecutionEnvironment env =
+ // StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
+ Map<String, String> flinkMap = new HashMap<>();
+ flinkMap.put("execution.checkpointing.interval", "10s");
+ flinkMap.put("pipeline.operator-chaining", "false");
+ flinkMap.put("parallelism.default", "8");
+
+ String database = "cdc_test";
+ String tablePrefix = "";
+ String tableSuffix = "";
+
+ Configuration configuration = Configuration.fromMap(flinkMap);
+ env.configure(configuration);
+ Map<String, String> mongoConfig = new HashMap<>();
+ mongoConfig.put("database", "test");
+ mongoConfig.put("hosts", "127.0.0.1:27017");
+ mongoConfig.put("username", "flinkuser");
+ // mysqlConfig.put("password","");
+ mongoConfig.put("password", "flinkpwd");
+ // mongoConfig.put("scan.startup.mode", "latest-offset");
+ mongoConfig.put("scan.startup.mode", "initial");
+ mongoConfig.put("schema.sample-percent", "1");
+ Configuration config = Configuration.fromMap(mongoConfig);
+
+ Map<String, String> sinkConfig = new HashMap<>();
+ sinkConfig.put("fenodes", "127.0.0.1:8030");
+ // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040");
+ sinkConfig.put("username", "root");
+ sinkConfig.put("password", "");
+ sinkConfig.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
+ sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+ sinkConfig.put("auto-redirect", "false");
+ // sinkConfig.put("sink.enable.batch-mode","true");
+ // sinkConfig.put("sink.write-mode","stream_load_batch");
+ Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+ Map<String, String> tableConfig = new HashMap<>();
+ tableConfig.put("replication_num", "1");
+ tableConfig.put("table-buckets", ".*:1");
+ String includingTables = "cdc_test";
+ // String includingTables = "a_.*|b_.*|c";
+ String excludingTables = "";
+ String multiToOneOrigin = "a_.*|b_.*";
+ String multiToOneTarget = "a|b";
+ boolean ignoreDefaultValue = false;
+ // boolean useNewSchemaChange = false;
+ DatabaseSync databaseSync = new MongoDBDatabaseSync();
+ databaseSync
+ .setEnv(env)
+ .setDatabase(database)
+ .setConfig(config)
+ .setTablePrefix(tablePrefix)
+ .setTableSuffix(tableSuffix)
+ .setIncludingTables(includingTables)
+ .setExcludingTables(excludingTables)
+ .setMultiToOneOrigin(multiToOneOrigin)
+ .setMultiToOneTarget(multiToOneTarget)
+ .setIgnoreDefaultValue(ignoreDefaultValue)
+ .setSinkConfig(sinkConf)
+ .setTableConfig(tableConfig)
+ .setCreateTableOnly(false)
+ // .setSingleSink(true)
+ // .setNewSchemaChange(useNewSchemaChange)
+ .create();
+ databaseSync.build();
+ env.execute(String.format("Mongo-Doris Database Sync: %s", database));
+ }
+}