[Test] add itcases for DorisSource and Dorissink (#270)

Add integration tests and add them to the ci
diff --git a/.github/workflows/run-itcase.yml b/.github/workflows/run-itcase.yml
new file mode 100644
index 0000000..624ccaa
--- /dev/null
+++ b/.github/workflows/run-itcase.yml
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: Run ITCases
+on:
+  pull_request:
+  push:
+
+jobs:
+  build-extension:
+    name: "Run ITCases"
+    runs-on: ubuntu-latest
+    defaults:
+      run:
+        shell: bash
+    steps:
+    - name: Checkout
+      uses: actions/checkout@master
+
+    - name: Setup java
+      uses: actions/setup-java@v2
+      with:
+        distribution: adopt
+        java-version: '8'
+
+    - name: Run ITCases
+      run: |
+        cd flink-doris-connector && mvn test -Dtest="*ITCase"
+
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 54dc7ba..5057099 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -220,34 +220,6 @@
             <version>31.1-jre</version>
         </dependency>
 
-        <!--Test-->
-        <dependency>
-            <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-core</artifactId>
-            <version>1.3</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-            <version>4.2.0</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-inline</artifactId>
-            <version>4.2.0</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.11</version>
-            <scope>test</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
@@ -309,6 +281,45 @@
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
+        <!--Test-->
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-core</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>4.2.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <version>4.2.0</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <version>5.10.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
new file mode 100644
index 0000000..a0b7601
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink;
+
+import com.google.common.collect.Lists;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+public abstract class DorisTestBase {
+    protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class);
+//    protected static final String DORIS_12_DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_arm";
+    protected static final String DORIS_12_DOCKER_IMAGE = "adamlee489/doris:1.2.7.1_x86";
+    private static final String DRIVER_JAR =
+            "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
+    private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+    private static final String URL = "jdbc:mysql://%s:9030";
+    protected static final String USERNAME = "root";
+    protected static final String PASSWORD = "";
+    protected static final GenericContainer DORIS_CONTAINER = createDorisContainer();
+    protected static Connection connection;
+    protected static final int DEFAULT_PARALLELISM = 4;
+
+    protected static String getFenodes(){
+        return DORIS_CONTAINER.getHost() + ":8030";
+    }
+
+    @BeforeClass
+    public static void startContainers() {
+        LOG.info("Starting containers...");
+        Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
+        given().ignoreExceptions()
+                .await()
+                .atMost(120, TimeUnit.SECONDS)
+                .untilAsserted(DorisTestBase::initializeJdbcConnection);
+        LOG.info("Containers are started.");
+    }
+
+    @AfterClass
+    public static void stopContainers() {
+        LOG.info("Stopping containers...");
+        DORIS_CONTAINER.stop();
+        LOG.info("Containers are stopped.");
+    }
+
+    public static GenericContainer createDorisContainer(){
+        GenericContainer container =  new GenericContainer<>(DORIS_12_DOCKER_IMAGE)
+                        .withNetwork(Network.newNetwork())
+                        .withNetworkAliases("DorisContainer")
+                        .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
+                        .withEnv("FE_ID", "1")
+                        .withEnv("CURRENT_BE_IP", "127.0.0.1")
+                        .withEnv("CURRENT_BE_PORT", "9050")
+                        .withCommand("ulimit -n 65536")
+                        .withCreateContainerCmdModifier(
+                                cmd -> cmd.getHostConfig().withMemorySwap(0L))
+                        .withPrivilegedMode(true)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DORIS_12_DOCKER_IMAGE)));
+
+        container.setPortBindings(
+                Lists.newArrayList(
+                        String.format("%s:%s", "8030", "8030"),
+                        String.format("%s:%s", "9030", "9030"),
+                        String.format("%s:%s", "9060", "9060"),
+                        String.format("%s:%s", "8040", "8040")));
+
+        return container;
+    }
+
+    protected static void initializeJdbcConnection()
+            throws SQLException, MalformedURLException {
+        URLClassLoader urlClassLoader =
+                new URLClassLoader(
+                        new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader());
+        LOG.info("Try to connect to Doris...");
+        Thread.currentThread().setContextClassLoader(urlClassLoader);
+        connection = DriverManager.getConnection(String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
+        try (Statement statement = connection.createStatement()) {
+            ResultSet resultSet;
+            do {
+                LOG.info("Wait for the Backend to start successfully...");
+                resultSet = statement.executeQuery("show backends");
+            } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
+        }
+        LOG.info("Connected to Doris successfully...");
+    }
+
+    private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLException {
+        if (rs.next()) {
+            String isAlive = rs.getString(10).trim();
+            String totalCap = rs.getString(16).trim();
+            LockSupport.parkNanos(duration.toNanos());
+            return "true".equalsIgnoreCase(isAlive) && !"0.000".equalsIgnoreCase(totalCap);
+        }
+        return false;
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
new file mode 100644
index 0000000..a0be145
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.DorisTestBase;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * DorisSink ITCase with csv and arrow format
+ */
+public class DorisSinkITCase extends DorisTestBase {
+    static final String DATABASE = "test";
+    static final String TABLE_CSV = "tbl_csv";
+    static final String TABLE_JSON = "tbl_json";
+    static final String TABLE_JSON_TBL = "tbl_json_tbl";
+
+    @Test
+    public void testSinkCsvFormat() throws Exception {
+        initializeTable(TABLE_CSV);
+        Properties properties = new Properties();
+        properties.setProperty("column_separator", ",");
+        properties.setProperty("line_delimiter", "\n");
+        properties.setProperty("format", "csv");
+        submitJob(TABLE_CSV, properties, new String[]{"doris,1"});
+
+        Thread.sleep(10000);
+        Set<List<Object>> actual = new HashSet<>();
+        try (Statement sinkStatement = connection.createStatement()) {
+            ResultSet sinkResultSet = sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_CSV));
+            while (sinkResultSet.next()) {
+                List<Object> row =
+                        Arrays.asList(
+                                sinkResultSet.getString("name"),
+                                sinkResultSet.getInt("age"));
+                actual.add(row);
+            }
+        }
+        Set<List<Object>> expected =
+                Stream.<List<Object>>of(Arrays.asList("doris", 1))
+                        .collect(Collectors.toSet());
+        Assertions.assertIterableEquals(expected, actual);
+    }
+
+    @Test
+    public void testSinkJsonFormat() throws Exception {
+        initializeTable(TABLE_JSON);
+        Properties properties = new Properties();
+        properties.setProperty("read_json_by_line", "true");
+        properties.setProperty("format", "json");
+
+        //mock data
+        Map<String, Object> row1 = new HashMap<>();
+        row1.put("name", "doris1");
+        row1.put("age", 1);
+        Map<String, Object> row2 = new HashMap<>();
+        row2.put("name", "doris2");
+        row2.put("age", 2);
+
+        submitJob(TABLE_JSON, properties, new String[]{new ObjectMapper().writeValueAsString(row1), new ObjectMapper().writeValueAsString(row2)});
+
+        Thread.sleep(10000);
+        Set<List<Object>> actual = new HashSet<>();
+        try (Statement sinkStatement = connection.createStatement()) {
+            ResultSet sinkResultSet = sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON));
+            while (sinkResultSet.next()) {
+                List<Object> row =
+                        Arrays.asList(
+                                sinkResultSet.getString("name"),
+                                sinkResultSet.getInt("age"));
+                actual.add(row);
+            }
+        }
+        Set<List<Object>> expected =
+                Stream.<List<Object>>of(Arrays.asList("doris1", 1),Arrays.asList("doris2", 2))
+                        .collect(Collectors.toSet());
+        Assertions.assertIterableEquals(expected, actual);
+    }
+
+    public void submitJob(String table, Properties properties, String[] records) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        DorisSink.Builder<String> builder = DorisSink.builder();
+        final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
+
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder.setFenodes(getFenodes())
+                .setTableIdentifier(DATABASE + "." + table)
+                .setUsername(USERNAME)
+                .setPassword(PASSWORD);
+        DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
+        executionBuilder.setLabelPrefix(UUID.randomUUID().toString())
+                .setStreamLoadProp(properties);
+
+        builder.setDorisReadOptions(readOptionBuilder.build())
+                .setDorisExecutionOptions(executionBuilder.build())
+                .setSerializer(new SimpleStringSerializer())
+                .setDorisOptions(dorisBuilder.build());
+
+        env.fromElements(records).sinkTo(builder.build());
+        env.execute();
+    }
+
+
+    @Test
+    public void testTableSinkJsonFormat() throws Exception {
+        initializeTable(TABLE_JSON_TBL);
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String sinkDDL =
+                String.format(
+                        "CREATE TABLE doris_sink ("
+                                + " name STRING,"
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = 'doris',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'sink.properties.format' = 'json',"
+                                + " 'sink.properties.read_json_by_line' = 'true',"
+                                + " 'sink.label-prefix' = 'doris_sink'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE + "." + TABLE_JSON_TBL,
+                        USERNAME,
+                        PASSWORD);
+        tEnv.executeSql(sinkDDL);
+        tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2");
+
+        Thread.sleep(10000);
+        Set<List<Object>> actual = new HashSet<>();
+        try (Statement sinkStatement = connection.createStatement()) {
+            ResultSet sinkResultSet = sinkStatement.executeQuery(String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON_TBL));
+            while (sinkResultSet.next()) {
+                List<Object> row =
+                        Arrays.asList(
+                                sinkResultSet.getString("name"),
+                                sinkResultSet.getInt("age"));
+                actual.add(row);
+            }
+        }
+        Set<List<Object>> expected =
+                Stream.<List<Object>>of(Arrays.asList("doris", 1),Arrays.asList("flink", 2))
+                        .collect(Collectors.toSet());
+        Assertions.assertIterableEquals(expected, actual);
+    }
+
+
+    private void initializeTable(String table) throws Exception {
+        try(Statement statement = connection.createStatement()){
+            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
+            statement.execute(String.format("CREATE TABLE %s.%s ( \n" +
+                    "`name` varchar(256),\n" +
+                    "`age` int\n" +
+                    ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
+                    "PROPERTIES (\n" +
+                    "\"replication_num\" = \"1\"\n" +
+                    ")\n", DATABASE, table));
+        }
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
new file mode 100644
index 0000000..83e959e
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.source;
+
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
+import org.apache.doris.flink.DorisTestBase;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * DorisSource ITCase
+ */
+public class DorisSourceITCase extends DorisTestBase {
+    static final String DATABASE = "test";
+    static final String TABLE_READ = "tbl_read";
+    static final String TABLE_READ_TBL = "tbl_read_tbl";
+
+    @Test
+    public void testSource() throws Exception {
+        initializeTable(TABLE_READ);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
+
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder.setFenodes(getFenodes())
+                .setTableIdentifier(DATABASE + "." + TABLE_READ)
+                .setUsername(USERNAME)
+                .setPassword(PASSWORD);
+
+        DorisSource<List<?>> source = DorisSource.<List<?>>builder()
+                .setDorisReadOptions(readOptionBuilder.build())
+                .setDorisOptions(dorisBuilder.build())
+                .setDeserializer(new SimpleListDeserializationSchema())
+                .build();
+        List<Object> actual = new ArrayList<>();
+        try(CloseableIterator<List<?>> iterator =
+                    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Doris Source")
+                            .executeAndCollect()){
+            while (iterator.hasNext()) {
+                actual.add(iterator.next());
+            }
+        }
+        List<Object> expected = Arrays.asList(Arrays.asList("doris", 18), Arrays.asList("flink", 10));
+        Assertions.assertIterableEquals(expected, actual);;
+    }
+
+    @Test
+    public void testTableSource() throws Exception {
+        initializeTable(TABLE_READ_TBL);
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE doris_source ("
+                                + " name STRING,"
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = 'doris',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE + "." + TABLE_READ_TBL,
+                        USERNAME,
+                        PASSWORD);
+        tEnv.executeSql(sourceDDL);
+        TableResult tableResult = tEnv.executeSql("SELECT * FROM doris_source");
+
+        List<Object> actual = new ArrayList<>();
+        try(CloseableIterator<Row> iterator = tableResult.collect()){
+            while (iterator.hasNext()) {
+                actual.add(iterator.next().toString());
+            }
+        }
+        String[] expected =
+                new String[] {
+                        "+I[doris, 18]",
+                        "+I[flink, 10]"
+                };
+        Assertions.assertIterableEquals(Arrays.asList(expected), actual);
+    }
+
+    private void initializeTable(String table) throws Exception {
+        try(Statement statement = connection.createStatement()){
+            statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
+            statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
+            statement.execute(String.format("CREATE TABLE %s.%s ( \n" +
+                    "`name` varchar(256),\n" +
+                    "`age` int\n" +
+                    ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
+                    "PROPERTIES (\n" +
+                    "\"replication_num\" = \"1\"\n" +
+                    ")\n", DATABASE, table));
+            statement.execute(String.format("insert into %s.%s  values ('doris',18)", DATABASE, table));
+            statement.execute(String.format("insert into %s.%s  values ('flink',10)", DATABASE, table));
+        }
+    }
+}