[Feature][Druid]Support multi table for druid sink (#7023)

diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
index 318f3a1..99758c7 100644
--- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
+++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
@@ -20,6 +20,7 @@
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -32,7 +33,8 @@
 import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
 import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE;
 
-public class DruidSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class DruidSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+        implements SupportMultiTableSink {
 
     private ReadonlyConfig config;
     private CatalogTable catalogTable;
diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
index 44e8878..0f78ba0 100644
--- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
@@ -18,8 +18,11 @@
 
 package org.apache.seatunnel.connectors.druid.sink;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -27,6 +30,9 @@
 
 import com.google.auto.service.AutoService;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
 import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE;
 
@@ -44,7 +50,48 @@
 
     @Override
     public TableSink createSink(TableSinkFactoryContext context) {
+        ReadonlyConfig readonlyConfig = context.getOptions();
         CatalogTable catalogTable = context.getCatalogTable();
-        return () -> new DruidSink(context.getOptions(), catalogTable);
+
+        ReadonlyConfig finalReadonlyConfig =
+                generateCurrentReadonlyConfig(readonlyConfig, catalogTable);
+        return () -> new DruidSink(finalReadonlyConfig, catalogTable);
+    }
+
+    private ReadonlyConfig generateCurrentReadonlyConfig(
+            ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+
+        Map<String, String> configMap = readonlyConfig.toMap();
+
+        readonlyConfig
+                .getOptional(DATASOURCE)
+                .ifPresent(
+                        tableName -> {
+                            String replacedPath =
+                                    replaceCatalogTableInPath(tableName, catalogTable);
+                            configMap.put(DATASOURCE.key(), replacedPath);
+                        });
+
+        return ReadonlyConfig.fromMap(new HashMap<>(configMap));
+    }
+
+    private String replaceCatalogTableInPath(String originTableName, CatalogTable catalogTable) {
+        String tableName = originTableName;
+        TableIdentifier tableIdentifier = catalogTable.getTableId();
+        if (tableIdentifier != null) {
+            if (tableIdentifier.getSchemaName() != null) {
+                tableName =
+                        tableName.replace(
+                                SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY,
+                                tableIdentifier.getSchemaName());
+            }
+            if (tableIdentifier.getTableName() != null) {
+                tableName =
+                        tableName.replace(
+                                SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY,
+                                tableIdentifier.getTableName());
+            }
+        }
+        return tableName;
     }
 }
diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java
index 3f7709b..0ebb1c4 100644
--- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java
+++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.seatunnel.connectors.druid.sink;
 
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -65,7 +66,8 @@
 import java.util.StringJoiner;
 import java.util.stream.Collectors;
 
-public class DruidWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class DruidWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+        implements SupportMultiTableSinkWriter<Void> {
 
     private static final Logger LOG = LoggerFactory.getLogger(DruidWriter.class);
 
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
index 1639636..21bf3c5 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
@@ -20,6 +20,7 @@
 
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
@@ -58,8 +59,11 @@
         disabledReason = "The RoaringBitmap version is not compatible in docker container")
 public class DruidIT extends TestSuiteBase implements TestResource {
 
-    private static final String datasource = "testDataSource";
-    private static final String sqlQuery = "SELECT * FROM " + datasource;
+    private static final String DATASOURCE = "testDataSource";
+    private static final String MULTI_DATASOURCE_1 = "druid_sink_1";
+    private static final String MULTI_DATASOURCE_2 = "druid_sink_2";
+    private static final String SQL_QUERY_TEMPLATE = "SELECT * FROM ";
+    private static final String CONF_PREFIX = "src/test/resources";
     private static final String DRUID_SERVICE_NAME = "router";
     private static final int DRUID_SERVICE_PORT = 8888;
     private DockerComposeContainer environment;
@@ -76,7 +80,8 @@
                                 Wait.forListeningPort()
                                         .withStartupTimeout(Duration.ofSeconds(360)));
         environment.start();
-        changeCoordinatorURLConf();
+        changeCoordinatorURLConf(CONF_PREFIX + "/fakesource_to_druid.conf");
+        changeCoordinatorURLConf(CONF_PREFIX + "/fakesource_to_druid_with_multi.conf");
     }
 
     @AfterAll
@@ -90,39 +95,64 @@
         Container.ExecResult execResult = container.executeJob("/fakesource_to_druid.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
         while (true) {
-            try (CloseableHttpClient client = HttpClients.createDefault()) {
-                HttpPost request = new HttpPost("http://" + coordinatorURL + "/druid/v2/sql");
-                String jsonRequest = "{\"query\": \"" + sqlQuery + "\"}";
-                StringEntity entity = new StringEntity(jsonRequest);
-                entity.setContentType("application/json");
-                request.setEntity(entity);
-                HttpResponse response = client.execute(request);
-                String responseBody = EntityUtils.toString(response.getEntity());
-                String expectedDataRow1 =
-                        "\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3";
-                String expectedDataRow2 =
-                        "\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999";
-                String expectedDataRow3 =
-                        "\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489";
-                String expectedDataRow4 =
-                        "\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012";
+            String responseBody = getSelectResponse(DATASOURCE);
+            String expectedDataRow1 =
+                    "\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3";
+            String expectedDataRow2 =
+                    "\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999";
+            String expectedDataRow3 =
+                    "\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489";
+            String expectedDataRow4 =
+                    "\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012";
 
-                if (!responseBody.contains("errorMessage")) {
-                    // Check sink data
-                    Assertions.assertEquals(responseBody.contains(expectedDataRow1), true);
-                    Assertions.assertEquals(responseBody.contains(expectedDataRow2), true);
-                    Assertions.assertEquals(responseBody.contains(expectedDataRow3), true);
-                    Assertions.assertEquals(responseBody.contains(expectedDataRow4), true);
-                    break;
-                }
-                Thread.sleep(1000);
+            if (!responseBody.contains("errorMessage")) {
+                // Check sink data
+                Assertions.assertEquals(responseBody.contains(expectedDataRow1), true);
+                Assertions.assertEquals(responseBody.contains(expectedDataRow2), true);
+                Assertions.assertEquals(responseBody.contains(expectedDataRow3), true);
+                Assertions.assertEquals(responseBody.contains(expectedDataRow4), true);
+                break;
             }
+            Thread.sleep(1000);
         }
     }
 
-    private void changeCoordinatorURLConf() throws UnknownHostException {
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK/FLINK do not support multiple table read")
+    @TestTemplate
+    public void testDruidMultiSink(TestContainer container) throws Exception {
+        Container.ExecResult execResult =
+                container.executeJob("/fakesource_to_druid_with_multi.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        // Check multi sink table 1
+        while (true) {
+            String responseBody = getSelectResponse(MULTI_DATASOURCE_1);
+            String expectedDataRow =
+                    "\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3,\"val_string\":\"NEW\"";
+
+            if (!responseBody.contains("errorMessage")) {
+                Assertions.assertEquals(responseBody.contains(expectedDataRow), true);
+                break;
+            }
+            Thread.sleep(1000);
+        }
+        // Check multi sink table 2
+        while (true) {
+            String responseBody = getSelectResponse(MULTI_DATASOURCE_2);
+            String expectedDataRow =
+                    "\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3";
+            if (!responseBody.contains("errorMessage")) {
+                Assertions.assertEquals(responseBody.contains(expectedDataRow), true);
+                break;
+            }
+            Thread.sleep(1000);
+        }
+    }
+
+    private void changeCoordinatorURLConf(String resourceFilePath) throws UnknownHostException {
         coordinatorURL = InetAddress.getLocalHost().getHostAddress() + ":8888";
-        String resourceFilePath = "src/test/resources/fakesource_to_druid.conf";
         Path path = Paths.get(resourceFilePath);
         try {
             List<String> lines = Files.readAllLines(path);
@@ -145,4 +175,17 @@
             throw new RuntimeException("Change conf error", e);
         }
     }
+
+    private String getSelectResponse(String datasource) throws IOException {
+        try (CloseableHttpClient client = HttpClients.createDefault()) {
+            HttpPost request = new HttpPost("http://" + coordinatorURL + "/druid/v2/sql");
+            String jsonRequest = "{\"query\": \"" + SQL_QUERY_TEMPLATE + datasource + "\"}";
+            StringEntity entity = new StringEntity(jsonRequest);
+            entity.setContentType("application/json");
+            request.setEntity(entity);
+            HttpResponse response = client.execute(request);
+            String responseBody = EntityUtils.toString(response.getEntity());
+            return responseBody;
+        }
+    }
 }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid_with_multi.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid_with_multi.conf
new file mode 100644
index 0000000..a66cde8
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid_with_multi.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    tables_configs = [
+       {
+        schema = {
+          table = "druid_sink_1"
+         fields {
+                id = int
+                val_bool = boolean
+                val_tinyint = tinyint
+                val_smallint = smallint
+                val_int = int
+                val_bigint = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+      }
+        }
+            rows = [
+              {
+                kind = INSERT
+                fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"]
+              }
+              ]
+       },
+       {
+       schema = {
+         table = "druid_sink_2"
+              fields {
+                id = int
+                val_bool = boolean
+                val_tinyint = tinyint
+                val_smallint = smallint
+                val_int = int
+                val_bigint = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+              }
+       }
+           rows = [
+             {
+               kind = INSERT
+               fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3]
+             }
+             ]
+      }
+    ]
+  }
+}
+
+transform {
+}
+
+sink {
+  Druid {
+    coordinatorUrl = "localhost:8888"
+    datasource = "${table_name}"
+  }
+}