add sqlserver cdc (#181)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 73a748c..333a40b 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -261,6 +261,12 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
+ <version>2.4.1</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 754dbce..6a390ea 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -20,6 +20,7 @@
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
+import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -37,6 +38,7 @@
private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database";
private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database";
private static final String POSTGRES_SYNC_DATABASE = "postgres-sync-database";
+ private static final String SQLSERVER_SYNC_DATABASE = "sqlserver-sync-database";
private static final List<String> EMPTY_KEYS = Arrays.asList("password");
public static void main(String[] args) throws Exception {
@@ -53,6 +55,9 @@
case POSTGRES_SYNC_DATABASE:
createPostgresSyncDatabase(opArgs);
break;
+ case SQLSERVER_SYNC_DATABASE:
+ createSqlServerSyncDatabase(opArgs);
+ break;
default:
System.out.println("Unknown operation " + operation);
System.exit(1);
@@ -83,6 +88,14 @@
syncDatabase(params, databaseSync, postgresConfig, "Postgres");
}
+ private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception {
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ Map<String, String> postgresMap = getConfigMap(params, "sqlserver-conf");
+ Configuration postgresConfig = Configuration.fromMap(postgresMap);
+ DatabaseSync databaseSync = new SqlServerDatabaseSync();
+ syncDatabase(params, databaseSync, postgresConfig, "SqlServer");
+ }
+
private static void syncDatabase(MultipleParameterTool params, DatabaseSync databaseSync, Configuration config, String type) throws Exception {
String jobName = params.get("job-name");
String database = params.get("database");
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
index 87cbde2..8886c09 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java
@@ -78,7 +78,7 @@
case BIGSERIAL:
return DorisType.BIGINT;
case NUMERIC:
- return precision != null && precision <= 38
+ return precision != null && precision > 0 && precision <= 38
? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale != null && scale >= 0 ? scale : 0)
: DorisType.STRING;
case FLOAT4:
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
new file mode 100644
index 0000000..6cf9c9d
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.sqlserver;
+
+
+import com.ververica.cdc.connectors.base.options.JdbcSourceOptions;
+import com.ververica.cdc.connectors.base.options.SourceOptions;
+import com.ververica.cdc.connectors.base.options.StartupOptions;
+import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
+import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
+import com.ververica.cdc.debezium.DebeziumSourceFunction;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static com.ververica.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+
+public class SqlServerDatabaseSync extends DatabaseSync {
+ private static final Logger LOG = LoggerFactory.getLogger(SqlServerDatabaseSync.class);
+ private static String JDBC_URL = "jdbc:sqlserver://%s:%d;database=%s";
+ private static String PORT = "port";
+
+ public SqlServerDatabaseSync() {
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ String jdbcUrl = String.format(JDBC_URL, config.get(JdbcSourceOptions.HOSTNAME), config.getInteger(PORT, 1433),config.get(JdbcSourceOptions.DATABASE_NAME));
+ Properties pro = new Properties();
+ pro.setProperty("user", config.get(JdbcSourceOptions.USERNAME));
+ pro.setProperty("password", config.get(JdbcSourceOptions.PASSWORD));
+ return DriverManager.getConnection(jdbcUrl, pro);
+ }
+
+ @Override
+ public List<SourceSchema> getSchemaList() throws Exception {
+ String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME);
+ String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
+ List<SourceSchema> schemaList = new ArrayList<>();
+ LOG.info("database-name {}, schema-name {}", databaseName, schemaName);
+ try (Connection conn = getConnection()) {
+ DatabaseMetaData metaData = conn.getMetaData();
+ try (ResultSet tables =
+ metaData.getTables(databaseName, null, "%", new String[]{"TABLE"})) {
+ while (tables.next()) {
+ String tableName = tables.getString("TABLE_NAME");
+ String tableComment = tables.getString("REMARKS");
+ if (!isSyncNeeded(tableName)) {
+ continue;
+ }
+ SourceSchema sourceSchema =
+ new SqlServerSchema(metaData, databaseName, null, tableName, tableComment);
+ sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 ? DataModel.UNIQUE : DataModel.DUPLICATE);
+ schemaList.add(sourceSchema);
+ }
+ }
+ }
+ return schemaList;
+ }
+
+ @Override
+ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
+ String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME);
+ String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
+ Preconditions.checkNotNull(databaseName, "database-name in sqlserver is required");
+ Preconditions.checkNotNull(databaseName, "schema-name in sqlserver is required");
+
+ String tableName = config.get(JdbcSourceOptions.TABLE_NAME);
+ String hostname = config.get(JdbcSourceOptions.HOSTNAME);
+ Integer port = config.getInteger(PORT, 1433);
+ String username = config.get(JdbcSourceOptions.USERNAME);
+ String password = config.get(JdbcSourceOptions.PASSWORD);
+
+ StartupOptions startupOptions = StartupOptions.initial();
+ String startupMode = config.get(JdbcSourceOptions.SCAN_STARTUP_MODE);
+ if ("initial".equalsIgnoreCase(startupMode)) {
+ startupOptions = StartupOptions.initial();
+ } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+ startupOptions = StartupOptions.latest();
+ }
+
+ //debezium properties set
+ Properties debeziumProperties = new Properties();
+ debeziumProperties.putAll(SqlServerDateConverter.DEFAULT_PROPS);
+ debeziumProperties.put("decimal.handling.mode", "string");
+
+ for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+ debeziumProperties.put(
+ key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+ }
+ }
+
+ Map<String, Object> customConverterConfigs = new HashMap<>();
+ JsonDebeziumDeserializationSchema schema =
+ new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
+
+ if(config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false)){
+ JdbcIncrementalSource<String> incrSource = SqlServerSourceBuilder.SqlServerIncrementalSource.<String>builder()
+ .hostname(hostname)
+ .port(port)
+ .databaseList(databaseName)
+ .tableList(schemaName + "." + tableName)
+ .username(username)
+ .password(password)
+ .startupOptions(startupOptions)
+ .deserializer(schema)
+ .includeSchemaChanges(true)
+ .debeziumProperties(debeziumProperties)
+ .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
+ .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
+ .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
+ .connectTimeout(config.get(CONNECT_TIMEOUT))
+ .connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
+ .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
+ .distributionFactorUpper(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
+ .distributionFactorLower(config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
+ .build();
+ return env.fromSource(incrSource, WatermarkStrategy.noWatermarks(), "SqlServer IncrSource");
+ }else{
+ DebeziumSourceFunction<String> sqlServerSource = SqlServerSource.<String>builder()
+ .hostname(hostname)
+ .port(port)
+ .database(databaseName)
+ .tableList(schemaName + "." + tableName)
+ .username(username)
+ .password(password)
+ .debeziumProperties(debeziumProperties)
+ .startupOptions(startupOptions)
+ .deserializer(schema)
+ .build();
+ return env.addSource(sqlServerSource, "SqlServer Source");
+ }
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java
new file mode 100644
index 0000000..9f8a4a0
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDateConverter.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.sqlserver;
+
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.DateTimeException;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+public class SqlServerDateConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
+ private static final Logger log = LoggerFactory.getLogger(SqlServerDateConverter.class);
+ private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+ private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
+
+ public static Properties DEFAULT_PROPS = new Properties();
+
+ static {
+ DEFAULT_PROPS.setProperty("converters", "date");
+ DEFAULT_PROPS.setProperty("date.type", "org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDateConverter");
+ DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd");
+ DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd HH:mm:ss.SSSSSS");
+ }
+
+ @Override
+ public void configure(Properties props) {
+ readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p));
+ readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
+ }
+
+ private void readProps(Properties properties, String settingKey, Consumer<String> callback) {
+ String settingValue = (String) properties.get(settingKey);
+ if (settingValue == null || settingValue.length() == 0) {
+ return;
+ }
+ try {
+ callback.accept(settingValue.trim());
+ } catch (IllegalArgumentException | DateTimeException e) {
+ log.error("setting {} is illegal:{}", settingKey, settingValue);
+ throw e;
+ }
+ }
+
+ @Override
+ public void converterFor(RelationalColumn column, CustomConverter.ConverterRegistration<SchemaBuilder> registration) {
+ String sqlType = column.typeName().toUpperCase();
+ SchemaBuilder schemaBuilder = null;
+ CustomConverter.Converter converter = null;
+ if ("DATE".equals(sqlType)) {
+ schemaBuilder = SchemaBuilder.string().optional();
+ converter = this::convertDate;
+ }
+ if ("SMALLDATETIME".equals(sqlType) || "DATETIME".equals(sqlType) || "DATETIME2".equals(sqlType)) {
+ schemaBuilder = SchemaBuilder.string().optional();
+ converter = this::convertDateTime;
+ }
+ if (schemaBuilder != null) {
+ registration.register(schemaBuilder, converter);
+ }
+ }
+
+ private Object convertDateTime(Object input) {
+ if (input instanceof Timestamp) {
+ return timestampFormatter.format(((Timestamp) input).toLocalDateTime());
+ }
+ return null;
+ }
+
+ private String convertDate(Object input) {
+ if (input instanceof Date){
+ return dateFormatter.format(((Date) input).toLocalDate());
+ }
+ return null;
+ }
+
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
new file mode 100644
index 0000000..36ecb42
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerSchema.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.sqlserver;
+
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+
+import java.sql.DatabaseMetaData;
+
+public class SqlServerSchema extends SourceSchema {
+
+ public SqlServerSchema(DatabaseMetaData metaData, String databaseName, String schemaName, String tableName, String tableComment) throws Exception {
+ super(metaData, databaseName, schemaName, tableName, tableComment);
+ }
+
+ @Override
+ public String convertToDorisType(String fieldType, Integer precision, Integer scale) {
+ return SqlServerType.toDorisType(fieldType, precision, scale);
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
new file mode 100644
index 0000000..aedb16f
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerType.java
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc.sqlserver;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+
+public class SqlServerType {
+ private static final String BIT = "bit";
+ private static final String TINYINT = "tinyint";
+ private static final String SMALLINT = "smallint";
+ private static final String INT = "int";
+ private static final String BIGINT = "bigint";
+ private static final String REAL = "real";
+ private static final String FLOAT = "float";
+ private static final String MONEY = "money";
+ private static final String SMALLMONEY = "smallmoney";
+ private static final String DECIMAL = "decimal";
+ private static final String NUMERIC = "numeric";
+ private static final String DATE = "date";
+ private static final String DATETIME = "datetime";
+ private static final String DATETIME2 = "datetime2";
+ private static final String SMALLDATETIME = "smalldatetime";
+ private static final String CHAR = "char";
+ private static final String VARCHAR = "varchar";
+ private static final String NCHAR = "nchar";
+ private static final String NVARCHAR = "nvarchar";
+ private static final String TEXT = "text";
+ private static final String NTEXT = "ntext";
+ private static final String TIME = "time";
+ private static final String DATETIMEOFFSET = "datetimeoffset";
+ private static final String IMAGE = "image";
+ private static final String BINARY = "binary";
+ private static final String VARBINARY = "varbinary";
+
+ public static String toDorisType(String sqlServerType, Integer precision, Integer scale) {
+ sqlServerType = sqlServerType.toLowerCase();
+ switch (sqlServerType){
+ case BIT:
+ return DorisType.BOOLEAN;
+ case TINYINT:
+ return DorisType.TINYINT;
+ case SMALLINT:
+ return DorisType.SMALLINT;
+ case INT:
+ return DorisType.INT;
+ case BIGINT:
+ return DorisType.BIGINT;
+ case REAL:
+ return DorisType.FLOAT;
+ case FLOAT:
+ return DorisType.DOUBLE;
+ case MONEY:
+ return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, 19, 4);
+ case SMALLMONEY:
+ return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, 10, 4);
+ case NUMERIC:
+ return precision != null && precision > 0 && precision <= 38
+ ? String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale != null && scale >= 0 ? scale : 0)
+ : DorisType.STRING;
+ case DATE:
+ return DorisType.DATE_V2;
+ case DATETIME:
+ case DATETIME2:
+ case SMALLDATETIME:
+ return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(scale == null ? 0 : scale, 6));
+ case CHAR:
+ case VARCHAR:
+ case NCHAR:
+ case NVARCHAR:
+ return precision * 3 > 65533 ? DorisType.STRING : String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
+ case TEXT:
+ case NTEXT:
+ case TIME:
+ case DATETIMEOFFSET:
+ return DorisType.STRING;
+ default:
+ throw new UnsupportedOperationException("Unsupported SqlServer Type: " + sqlServerType);
+ }
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
new file mode 100644
index 0000000..96780aa
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcSqlServerSyncDatabaseCase {
+
+ public static void main(String[] args) throws Exception{
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.disableOperatorChaining();
+ env.enableCheckpointing(10000);
+
+// Map<String,String> flinkMap = new HashMap<>();
+// flinkMap.put("execution.checkpointing.interval","10s");
+// flinkMap.put("pipeline.operator-chaining","false");
+// flinkMap.put("parallelism.default","1");
+//
+//
+// Configuration configuration = Configuration.fromMap(flinkMap);
+// env.configure(configuration);
+
+ String database = "db2";
+ String tablePrefix = "";
+ String tableSuffix = "";
+ Map<String,String> sourceConfig = new HashMap<>();
+ sourceConfig.put("database-name","CDC_DB");
+ sourceConfig.put("schema-name","dbo");
+ sourceConfig.put("hostname","127.0.0.1");
+ sourceConfig.put("port","1433");
+ sourceConfig.put("username","sa");
+ sourceConfig.put("password","123456");
+// sourceConfig.put("debezium.database.tablename.case.insensitive","false");
+// sourceConfig.put("scan.incremental.snapshot.enabled","true");
+// sourceConfig.put("debezium.include.schema.changes","false");
+
+ Configuration config = Configuration.fromMap(sourceConfig);
+
+ Map<String,String> sinkConfig = new HashMap<>();
+ sinkConfig.put("fenodes","127.0.0.1:8030");
+ sinkConfig.put("username","root");
+ sinkConfig.put("password","");
+ sinkConfig.put("jdbc-url","jdbc:mysql://127.0.0.1:9030");
+ sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+ Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+ Map<String,String> tableConfig = new HashMap<>();
+ tableConfig.put("replication_num", "1");
+
+ String includingTables = "products_test";
+ String excludingTables = "";
+ boolean ignoreDefaultValue = false;
+ boolean useNewSchemaChange = false;
+ DatabaseSync databaseSync = new SqlServerDatabaseSync();
+ databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
+ databaseSync.build();
+ env.execute(String.format("Postgres-Doris Database Sync: %s", database));
+
+ }
+}