[fix](cdc) configure table buckets for a single sink (#307)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 08e8f05..e5f6994 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -35,6 +35,7 @@
import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.apache.doris.flink.sink.writer.EventType;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceConnector;
import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
import org.apache.doris.flink.tools.cdc.oracle.OracleType;
@@ -246,9 +247,34 @@
Preconditions.checkArgument(split.length == 2);
tableSchema.setDatabase(split[0]);
tableSchema.setTable(split[1]);
+ if (tableProperties.containsKey("table-buckets")) {
+ String tableBucketsConfig = tableProperties.get("table-buckets");
+ Map<String, Integer> tableBuckets = DatabaseSync.getTableBuckets(tableBucketsConfig);
+ Integer buckets = getTableSchemaBuckets(tableBuckets, tableSchema.getTable());
+ tableSchema.setTableBuckets(buckets);
+ }
return tableSchema;
}
+ @VisibleForTesting
+ public Integer getTableSchemaBuckets(Map<String, Integer> tableBucketsMap, String tableName) {
+ if (tableBucketsMap != null) {
+ // Firstly, if the table name is in the table-buckets map, set the buckets of the table.
+ if (tableBucketsMap.containsKey(tableName)) {
+ return tableBucketsMap.get(tableName);
+ }
+ // Secondly, iterate over the map to find a corresponding regular expression match,
+ for (Map.Entry<String, Integer> entry : tableBucketsMap.entrySet()) {
+
+ Pattern pattern = Pattern.compile(entry.getKey());
+ if (pattern.matcher(tableName).matches()) {
+ return entry.getValue();
+ }
+ }
+ }
+ return null;
+ }
+
private List<String> buildDistributeKeys(
List<String> primaryKeys, Map<String, FieldSchema> fields) {
if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index e71ed51..e54e992 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -362,7 +362,7 @@
* @param tableBuckets the string of tableBuckets, eg:student:10,student_info:20,student.*:30
* @return The table name and buckets map. The key is table name, the value is buckets.
*/
- public Map<String, Integer> getTableBuckets(String tableBuckets) {
+ public static Map<String, Integer> getTableBuckets(String tableBuckets) {
Map<String, Integer> tableBucketsMap = new LinkedHashMap<>();
String[] tableBucketsArray = tableBuckets.split(",");
for (String tableBucket : tableBucketsArray) {