[Test] add itcases for DorisSource and Dorissink (#270)
Add integration tests and add them to the ci
diff --git a/.github/workflows/run-itcase.yml b/.github/workflows/run-itcase.yml
new file mode 100644
index 0000000..624ccaa
--- /dev/null
+++ b/.github/workflows/run-itcase.yml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: Run ITCases
+on:
+ pull_request:
+ push:
+
+jobs:
+ build-extension:
+ name: "Run ITCases"
+ runs-on: ubuntu-latest
+ defaults:
+ run:
+ shell: bash
+ steps:
+ - name: Checkout
+ uses: actions/checkout@master
+
+ - name: Setup java
+ uses: actions/setup-java@v2
+ with:
+ distribution: adopt
+ java-version: '8'
+
+ - name: Run ITCases
+ run: |
+ cd flink-doris-connector && mvn test -Dtest="*ITCase"
+
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 54dc7ba..5057099 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -220,34 +220,6 @@
<version>31.1-jre</version>
</dependency>
- <!--Test-->
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-core</artifactId>
- <version>1.3</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <version>4.2.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-inline</artifactId>
- <version>4.2.0</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.11</version>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
@@ -309,6 +281,45 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+ <!--Test-->
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>4.2.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <version>4.2.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>5.10.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>1.17.6</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
new file mode 100644
index 0000000..a0b7601
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink;
+
+import com.google.common.collect.Lists;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+public abstract class DorisTestBase {
+ protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class);
+// protected static final String DORIS_12_DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_arm";
+ protected static final String DORIS_12_DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_x86";
+ private static final String DRIVER_JAR =
+ "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
+ private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+ private static final String URL = "jdbc:mysql://%s:9030";
+ protected static final String USERNAME = "root";
+ protected static final String PASSWORD = "";
+ protected static final GenericContainer DORIS_CONTAINER = createDorisContainer();
+ protected static Connection connection;
+ protected static final int DEFAULT_PARALLELISM = 4;
+
+ protected static String getFenodes(){
+ return DORIS_CONTAINER.getHost() + ":8030";
+ }
+
+ @BeforeClass
+ public static void startContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
+ given().ignoreExceptions()
+ .await()
+ .atMost(120, TimeUnit.SECONDS)
+ .untilAsserted(DorisTestBase::initializeJdbcConnection);
+ LOG.info("Containers are started.");
+ }
+
+ @AfterClass
+ public static void stopContainers() {
+ LOG.info("Stopping containers...");
+ DORIS_CONTAINER.stop();
+ LOG.info("Containers are stopped.");
+ }
+
+ public static GenericContainer createDorisContainer(){
+ GenericContainer container = new GenericContainer<>(DORIS_12_DOCKER_IMAGE)
+ .withNetwork(Network.newNetwork())
+ .withNetworkAliases("DorisContainer")
+ .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
+ .withEnv("FE_ID", "1")
+ .withEnv("CURRENT_BE_IP", "127.0.0.1")
+ .withEnv("CURRENT_BE_PORT", "9050")
+ .withCommand("ulimit -n 65536")
+ .withCreateContainerCmdModifier(
+ cmd -> cmd.getHostConfig().withMemorySwap(0L))
+ .withPrivilegedMode(true)
+ .withLogConsumer(
+ new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DORIS_12_DOCKER_IMAGE)));
+
+ container.setPortBindings(
+ Lists.newArrayList(
+ String.format("%s:%s", "8030", "8030"),
+ String.format("%s:%s", "9030", "9030"),
+ String.format("%s:%s", "9060", "9060"),
+ String.format("%s:%s", "8040", "8040")));
+
+ return container;
+ }
+
+ protected static void initializeJdbcConnection()
+ throws SQLException, MalformedURLException {
+ URLClassLoader urlClassLoader =
+ new URLClassLoader(
+ new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader());
+ LOG.info("Try to connect to Doris...");
+ Thread.currentThread().setContextClassLoader(urlClassLoader);
+ connection = DriverManager.getConnection(String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
+ try (Statement statement = connection.createStatement()) {
+ ResultSet resultSet;
+ do {
+ LOG.info("Wait for the Backend to start successfully...");
+ resultSet = statement.executeQuery("show backends");
+ } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
+ }
+ LOG.info("Connected to Doris successfully...");
+ }
+
+ private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
+ if (rs.next()) {
+ String isAlive = rs.getString(10).trim();
+ String totalCap = rs.getString(16).trim();
+ LockSupport.parkNanos(duration.toNanos());
+ return "true".equalsIgnoreCase(isAlive) && !"0.000".equalsIgnoreCase(totalCap);
+ }
+ return false;
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
new file mode 100644
index 0000000..a0be145
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.DorisTestBase;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * DorisSink ITCase with csv and arrow format
+ */
+public class DorisSinkITCase extends DorisTestBase {
+ static final String DATABASE = "test";
+ static final String TABLE_CSV = "tbl_csv";
+ static final String TABLE_JSON = "tbl_json";
+ static final String TABLE_JSON_TBL = "tbl_json_tbl";
+
+ @Test
+ public void testSinkCsvFormat() throws Exception {
+ initializeTable(TABLE_CSV);
+ Properties properties = new Properties();
+ properties.setProperty("column_separator", ",");
+ properties.setProperty("line_delimiter", "\n");
+ properties.setProperty("format", "csv");
+ submitJob(TABLE_CSV, properties, new String[]{"doris,1"});
+
+ Thread.sleep(10000);
+ Set<List<Object>> actual = new HashSet<>();
+ try (Statement sinkStatement = connection.createStatement()) {
+ ResultSet sinkResultSet = sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_CSV));
+ while (sinkResultSet.next()) {
+ List<Object> row =
+ Arrays.asList(
+ sinkResultSet.getString("name"),
+ sinkResultSet.getInt("age"));
+ actual.add(row);
+ }
+ }
+ Set<List<Object>> expected =
+ Stream.<List<Object>>of(Arrays.asList("doris", 1))
+ .collect(Collectors.toSet());
+ Assertions.assertIterableEquals(expected, actual);
+ }
+
+ @Test
+ public void testSinkJsonFormat() throws Exception {
+ initializeTable(TABLE_JSON);
+ Properties properties = new Properties();
+ properties.setProperty("read_json_by_line", "true");
+ properties.setProperty("format", "json");
+
+ //mock data
+ Map<String, Object> row1 = new HashMap<>();
+ row1.put("name", "doris1");
+ row1.put("age", 1);
+ Map<String, Object> row2 = new HashMap<>();
+ row2.put("name", "doris2");
+ row2.put("age", 2);
+
+ submitJob(TABLE_JSON, properties, new String[]{new ObjectMapper().writeValueAsString(row1), new ObjectMapper().writeValueAsString(row2)});
+
+ Thread.sleep(10000);
+ Set<List<Object>> actual = new HashSet<>();
+ try (Statement sinkStatement = connection.createStatement()) {
+ ResultSet sinkResultSet = sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON));
+ while (sinkResultSet.next()) {
+ List<Object> row =
+ Arrays.asList(
+ sinkResultSet.getString("name"),
+ sinkResultSet.getInt("age"));
+ actual.add(row);
+ }
+ }
+ Set<List<Object>> expected =
+ Stream.<List<Object>>of(Arrays.asList("doris1", 1),Arrays.asList("doris2", 2))
+ .collect(Collectors.toSet());
+ Assertions.assertIterableEquals(expected, actual);
+ }
+
+ public void submitJob(String table, Properties properties, String[] records) throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ DorisSink.Builder<String> builder = DorisSink.builder();
+ final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
+
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder.setFenodes(getFenodes())
+ .setTableIdentifier(DATABASE + "." + table)
+ .setUsername(USERNAME)
+ .setPassword(PASSWORD);
+ DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
+ executionBuilder.setLabelPrefix(UUID.randomUUID().toString())
+ .setStreamLoadProp(properties);
+
+ builder.setDorisReadOptions(readOptionBuilder.build())
+ .setDorisExecutionOptions(executionBuilder.build())
+ .setSerializer(new SimpleStringSerializer())
+ .setDorisOptions(dorisBuilder.build());
+
+ env.fromElements(records).sinkTo(builder.build());
+ env.execute();
+ }
+
+
+ @Test
+ public void testTableSinkJsonFormat() throws Exception {
+ initializeTable(TABLE_JSON_TBL);
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String sinkDDL =
+ String.format(
+ "CREATE TABLE doris_sink ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = 'doris',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'sink.properties.format' = 'json',"
+ + " 'sink.properties.read_json_by_line' = 'true',"
+ + " 'sink.label-prefix' = 'doris_sink'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." + TABLE_JSON_TBL,
+ USERNAME,
+ PASSWORD);
+ tEnv.executeSql(sinkDDL);
+ tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2");
+
+ Thread.sleep(10000);
+ Set<List<Object>> actual = new HashSet<>();
+ try (Statement sinkStatement = connection.createStatement()) {
+ ResultSet sinkResultSet = sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON_TBL));
+ while (sinkResultSet.next()) {
+ List<Object> row =
+ Arrays.asList(
+ sinkResultSet.getString("name"),
+ sinkResultSet.getInt("age"));
+ actual.add(row);
+ }
+ }
+ Set<List<Object>> expected =
+ Stream.<List<Object>>of(Arrays.asList("doris", 1),Arrays.asList("flink", 2))
+ .collect(Collectors.toSet());
+ Assertions.assertIterableEquals(expected, actual);
+ }
+
+
+ private void initializeTable(String table) throws Exception {
+ try(Statement statement = connection.createStatement()){
+ statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
+ statement.execute(String.format("CREATE TABLE %s.%s ( \n" +
+ "`name` varchar(256),\n" +
+ "`age` int\n" +
+ ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
+ "PROPERTIES (\n" +
+ "\"replication_num\" = \"1\"\n" +
+ ")\n", DATABASE, table));
+ }
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
new file mode 100644
index 0000000..83e959e
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.source;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
+import org.apache.doris.flink.DorisTestBase;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * DorisSource ITCase
+ */
+public class DorisSourceITCase extends DorisTestBase {
+ static final String DATABASE = "test";
+ static final String TABLE_READ = "tbl_read";
+ static final String TABLE_READ_TBL = "tbl_read_tbl";
+
+ @Test
+ public void testSource() throws Exception {
+ initializeTable(TABLE_READ);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
+
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder.setFenodes(getFenodes())
+ .setTableIdentifier(DATABASE + "." + TABLE_READ)
+ .setUsername(USERNAME)
+ .setPassword(PASSWORD);
+
+ DorisSource<List<?>> source = DorisSource.<List<?>>builder()
+ .setDorisReadOptions(readOptionBuilder.build())
+ .setDorisOptions(dorisBuilder.build())
+ .setDeserializer(new SimpleListDeserializationSchema())
+ .build();
+ List<Object> actual = new ArrayList<>();
+ try(CloseableIterator<List<?>> iterator =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(), "Doris Source")
+ .executeAndCollect()){
+ while (iterator.hasNext()) {
+ actual.add(iterator.next());
+ }
+ }
+ List<Object> expected = Arrays.asList(Arrays.asList("doris", 18), Arrays.asList("flink", 10));
+ Assertions.assertIterableEquals(expected, actual);;
+ }
+
+ @Test
+ public void testTableSource() throws Exception {
+ initializeTable(TABLE_READ_TBL);
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE doris_source ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = 'doris',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." + TABLE_READ_TBL,
+ USERNAME,
+ PASSWORD);
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source");
+
+ List<Object> actual = new ArrayList<>();
+ try(CloseableIterator<Row> iterator = tableResult.collect()){
+ while (iterator.hasNext()) {
+ actual.add(iterator.next().toString());
+ }
+ }
+ String[] expected =
+ new String[] {
+ "+I[doris, 18]",
+ "+I[flink, 10]"
+ };
+ Assertions.assertIterableEquals(Arrays.asList(expected), actual);
+ }
+
+ private void initializeTable(String table) throws Exception {
+ try(Statement statement = connection.createStatement()){
+ statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
+ statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
+ statement.execute(String.format("CREATE TABLE %s.%s ( \n" +
+ "`name` varchar(256),\n" +
+ "`age` int\n" +
+ ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
+ "PROPERTIES (\n" +
+ "\"replication_num\" = \"1\"\n" +
+ ")\n", DATABASE, table));
+ statement.execute(String.format("insert into %s.%s values ('doris',18)", DATABASE, table));
+ statement.execute(String.format("insert into %s.%s values ('flink',10)", DATABASE, table));
+ }
+ }
+}