[fix](cdc) configure table buckets for a single sink (#307)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 08e8f05..e5f6994 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -35,6 +35,7 @@
 import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
 import org.apache.doris.flink.sink.schema.SchemaChangeManager;
 import org.apache.doris.flink.sink.writer.EventType;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
 import org.apache.doris.flink.tools.cdc.oracle.OracleType;
@@ -246,9 +247,34 @@
         Preconditions.checkArgument(split.length == 2);
         tableSchema.setDatabase(split[0]);
         tableSchema.setTable(split[1]);
+        if (tableProperties.containsKey("table-buckets")) {
+            String tableBucketsConfig = tableProperties.get("table-buckets");
+            Map<String, Integer> tableBuckets = DatabaseSync.getTableBuckets(tableBucketsConfig);
+            Integer buckets = getTableSchemaBuckets(tableBuckets, tableSchema.getTable());
+            tableSchema.setTableBuckets(buckets);
+        }
         return tableSchema;
     }
 
+    @VisibleForTesting
+    public Integer getTableSchemaBuckets(Map<String, Integer> tableBucketsMap, String tableName) {
+        if (tableBucketsMap != null) {
+            // Firstly, if the table name is in the table-buckets map, set the buckets of the table.
+            if (tableBucketsMap.containsKey(tableName)) {
+                return tableBucketsMap.get(tableName);
+            }
+            // Secondly, iterate over the map to find a corresponding regular expression match,
+            for (Map.Entry<String, Integer> entry : tableBucketsMap.entrySet()) {
+
+                Pattern pattern = Pattern.compile(entry.getKey());
+                if (pattern.matcher(tableName).matches()) {
+                    return entry.getValue();
+                }
+            }
+        }
+        return null;
+    }
+
     private List<String> buildDistributeKeys(
             List<String> primaryKeys, Map<String, FieldSchema> fields) {
         if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index e71ed51..e54e992 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -362,7 +362,7 @@
      * @param tableBuckets the string of tableBuckets, eg:student:10,student_info:20,student.*:30
      * @return The table name and buckets map. The key is table name, the value is buckets.
      */
-    public Map<String, Integer> getTableBuckets(String tableBuckets) {
+    public static Map<String, Integer> getTableBuckets(String tableBuckets) {
         Map<String, Integer> tableBucketsMap = new LinkedHashMap<>();
         String[] tableBucketsArray = tableBuckets.split(",");
         for (String tableBucket : tableBucketsArray) {