[Feature][Druid]Support multi table for druid sink (#7023)
diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
index 318f3a1..99758c7 100644
--- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
+++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
@@ -20,6 +20,7 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -32,7 +33,8 @@
import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE;
-public class DruidSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class DruidSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+ implements SupportMultiTableSink {
private ReadonlyConfig config;
private CatalogTable catalogTable;
diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
index 44e8878..0f78ba0 100644
--- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java
@@ -18,8 +18,11 @@
package org.apache.seatunnel.connectors.druid.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -27,6 +30,9 @@
import com.google.auto.service.AutoService;
+import java.util.HashMap;
+import java.util.Map;
+
import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE;
@@ -44,7 +50,48 @@
@Override
public TableSink createSink(TableSinkFactoryContext context) {
+ ReadonlyConfig readonlyConfig = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
- return () -> new DruidSink(context.getOptions(), catalogTable);
+
+ ReadonlyConfig finalReadonlyConfig =
+ generateCurrentReadonlyConfig(readonlyConfig, catalogTable);
+ return () -> new DruidSink(finalReadonlyConfig, catalogTable);
+ }
+
+ private ReadonlyConfig generateCurrentReadonlyConfig(
+ ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+
+ Map<String, String> configMap = readonlyConfig.toMap();
+
+ readonlyConfig
+ .getOptional(DATASOURCE)
+ .ifPresent(
+ tableName -> {
+ String replacedPath =
+ replaceCatalogTableInPath(tableName, catalogTable);
+ configMap.put(DATASOURCE.key(), replacedPath);
+ });
+
+ return ReadonlyConfig.fromMap(new HashMap<>(configMap));
+ }
+
+ private String replaceCatalogTableInPath(String originTableName, CatalogTable catalogTable) {
+ String tableName = originTableName;
+ TableIdentifier tableIdentifier = catalogTable.getTableId();
+ if (tableIdentifier != null) {
+ if (tableIdentifier.getSchemaName() != null) {
+ tableName =
+ tableName.replace(
+ SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY,
+ tableIdentifier.getSchemaName());
+ }
+ if (tableIdentifier.getTableName() != null) {
+ tableName =
+ tableName.replace(
+ SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY,
+ tableIdentifier.getTableName());
+ }
+ }
+ return tableName;
}
}
diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java
index 3f7709b..0ebb1c4 100644
--- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java
+++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidWriter.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.druid.sink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -65,7 +66,8 @@
import java.util.StringJoiner;
import java.util.stream.Collectors;
-public class DruidWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class DruidWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+ implements SupportMultiTableSinkWriter<Void> {
private static final Logger LOG = LoggerFactory.getLogger(DruidWriter.class);
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
index 1639636..21bf3c5 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/java/org/apache/seatunnel/e2e/connector/druid/DruidIT.java
@@ -20,6 +20,7 @@
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
@@ -58,8 +59,11 @@
disabledReason = "The RoaringBitmap version is not compatible in docker container")
public class DruidIT extends TestSuiteBase implements TestResource {
- private static final String datasource = "testDataSource";
- private static final String sqlQuery = "SELECT * FROM " + datasource;
+ private static final String DATASOURCE = "testDataSource";
+ private static final String MULTI_DATASOURCE_1 = "druid_sink_1";
+ private static final String MULTI_DATASOURCE_2 = "druid_sink_2";
+ private static final String SQL_QUERY_TEMPLATE = "SELECT * FROM ";
+ private static final String CONF_PREFIX = "src/test/resources";
private static final String DRUID_SERVICE_NAME = "router";
private static final int DRUID_SERVICE_PORT = 8888;
private DockerComposeContainer environment;
@@ -76,7 +80,8 @@
Wait.forListeningPort()
.withStartupTimeout(Duration.ofSeconds(360)));
environment.start();
- changeCoordinatorURLConf();
+ changeCoordinatorURLConf(CONF_PREFIX + "/fakesource_to_druid.conf");
+ changeCoordinatorURLConf(CONF_PREFIX + "/fakesource_to_druid_with_multi.conf");
}
@AfterAll
@@ -90,39 +95,64 @@
Container.ExecResult execResult = container.executeJob("/fakesource_to_druid.conf");
Assertions.assertEquals(0, execResult.getExitCode());
while (true) {
- try (CloseableHttpClient client = HttpClients.createDefault()) {
- HttpPost request = new HttpPost("http://" + coordinatorURL + "/druid/v2/sql");
- String jsonRequest = "{\"query\": \"" + sqlQuery + "\"}";
- StringEntity entity = new StringEntity(jsonRequest);
- entity.setContentType("application/json");
- request.setEntity(entity);
- HttpResponse response = client.execute(request);
- String responseBody = EntityUtils.toString(response.getEntity());
- String expectedDataRow1 =
- "\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3";
- String expectedDataRow2 =
- "\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999";
- String expectedDataRow3 =
- "\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489";
- String expectedDataRow4 =
- "\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012";
+ String responseBody = getSelectResponse(DATASOURCE);
+ String expectedDataRow1 =
+ "\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3";
+ String expectedDataRow2 =
+ "\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999";
+ String expectedDataRow3 =
+ "\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489";
+ String expectedDataRow4 =
+ "\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012";
- if (!responseBody.contains("errorMessage")) {
- // Check sink data
- Assertions.assertEquals(responseBody.contains(expectedDataRow1), true);
- Assertions.assertEquals(responseBody.contains(expectedDataRow2), true);
- Assertions.assertEquals(responseBody.contains(expectedDataRow3), true);
- Assertions.assertEquals(responseBody.contains(expectedDataRow4), true);
- break;
- }
- Thread.sleep(1000);
+ if (!responseBody.contains("errorMessage")) {
+ // Check sink data
+ Assertions.assertEquals(responseBody.contains(expectedDataRow1), true);
+ Assertions.assertEquals(responseBody.contains(expectedDataRow2), true);
+ Assertions.assertEquals(responseBody.contains(expectedDataRow3), true);
+ Assertions.assertEquals(responseBody.contains(expectedDataRow4), true);
+ break;
}
+ Thread.sleep(1000);
}
}
- private void changeCoordinatorURLConf() throws UnknownHostException {
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK/FLINK do not support multiple table read")
+ @TestTemplate
+ public void testDruidMultiSink(TestContainer container) throws Exception {
+ Container.ExecResult execResult =
+ container.executeJob("/fakesource_to_druid_with_multi.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ // Check multi sink table 1
+ while (true) {
+ String responseBody = getSelectResponse(MULTI_DATASOURCE_1);
+ String expectedDataRow =
+ "\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3,\"val_string\":\"NEW\"";
+
+ if (!responseBody.contains("errorMessage")) {
+ Assertions.assertEquals(responseBody.contains(expectedDataRow), true);
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ // Check multi sink table 2
+ while (true) {
+ String responseBody = getSelectResponse(MULTI_DATASOURCE_2);
+ String expectedDataRow =
+ "\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3";
+ if (!responseBody.contains("errorMessage")) {
+ Assertions.assertEquals(responseBody.contains(expectedDataRow), true);
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ }
+
+ private void changeCoordinatorURLConf(String resourceFilePath) throws UnknownHostException {
coordinatorURL = InetAddress.getLocalHost().getHostAddress() + ":8888";
- String resourceFilePath = "src/test/resources/fakesource_to_druid.conf";
Path path = Paths.get(resourceFilePath);
try {
List<String> lines = Files.readAllLines(path);
@@ -145,4 +175,17 @@
throw new RuntimeException("Change conf error", e);
}
}
+
+ private String getSelectResponse(String datasource) throws IOException {
+ try (CloseableHttpClient client = HttpClients.createDefault()) {
+ HttpPost request = new HttpPost("http://" + coordinatorURL + "/druid/v2/sql");
+ String jsonRequest = "{\"query\": \"" + SQL_QUERY_TEMPLATE + datasource + "\"}";
+ StringEntity entity = new StringEntity(jsonRequest);
+ entity.setContentType("application/json");
+ request.setEntity(entity);
+ HttpResponse response = client.execute(request);
+ String responseBody = EntityUtils.toString(response.getEntity());
+ return responseBody;
+ }
+ }
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid_with_multi.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid_with_multi.conf
new file mode 100644
index 0000000..a66cde8
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-druid-e2e/src/test/resources/fakesource_to_druid_with_multi.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ tables_configs = [
+ {
+ schema = {
+ table = "druid_sink_1"
+ fields {
+ id = int
+ val_bool = boolean
+ val_tinyint = tinyint
+ val_smallint = smallint
+ val_int = int
+ val_bigint = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "druid_sink_2"
+ fields {
+ id = int
+ val_bool = boolean
+ val_tinyint = tinyint
+ val_smallint = smallint
+ val_int = int
+ val_bigint = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ Druid {
+ coordinatorUrl = "localhost:8888"
+ datasource = "${table_name}"
+ }
+}