blob: a5ffb01800575b2eb9e20d93106f72e4f48c6f42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.e2e.spark.v2.jdbc;
import static org.awaitility.Awaitility.given;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.core.starter.config.ConfigBuilder;
import org.apache.seatunnel.e2e.spark.SparkContainer;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import java.net.URL;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class JdbcMysqlIT extends SparkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcMysqlIT.class);
private MySQLContainer<?> mc;
private Config config;
@SuppressWarnings("checkstyle:MagicNumber")
@BeforeEach
public void startPostgreSqlContainer() throws Exception {
// Non-root users need to grant XA_RECOVER_ADMIN permission on is_exactly_once = "true"
mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
.withNetwork(NETWORK)
.withNetworkAliases("mysql")
.withUsername("root")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(mc)).join();
LOGGER.info("Mysql container started");
Class.forName(mc.getDriverClassName());
given().ignoreExceptions()
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
batchInsertData();
}
private void initializeJdbcTable() {
URL resource = JdbcMysqlIT.class.getResource("/jdbc/init_sql/mysql_init.conf");
if (resource == null) {
throw new IllegalArgumentException("can't find find file");
}
config = new ConfigBuilder(Paths.get(resource.getPath())).getConfig();
CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
"type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
try (Connection connection = DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(), mc.getPassword())) {
Statement statement = connection.createStatement();
statement.execute(config.getString("source_table"));
statement.execute(config.getString("sink_table"));
statement.execute(config.getString("type_source_table"));
statement.execute(config.getString("type_sink_table"));
statement.execute(config.getString("insert_type_source_table_sql"));
} catch (SQLException e) {
throw new RuntimeException("Initializing Mysql table failed!", e);
}
}
@SuppressWarnings("checkstyle:MagicNumber")
private void batchInsertData() {
String sql = "insert into source(name, age) values(?,?)";
try (Connection connection = DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(), mc.getPassword())) {
connection.setAutoCommit(false);
PreparedStatement preparedStatement = connection.prepareStatement(sql);
for (List row : generateTestDataset()) {
preparedStatement.setString(1, (String) row.get(0));
preparedStatement.setInt(2, (Integer) row.get(1));
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
connection.commit();
} catch (SQLException e) {
throw new RuntimeException("Batch insert data failed!", e);
}
}
@Test
public void testJdbcMysqlSourceAndSink() throws Exception {
Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertIterableEquals(generateTestDataset(), queryResult());
}
@Test
public void testJdbcMysqlSourceAndSinkParallel() throws Exception {
Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink_parallel.conf");
Assertions.assertEquals(0, execResult.getExitCode());
//Sorting is required, because it is read in parallel, so there will be out of order
List<List> sortedResult = queryResult().stream().sorted(Comparator.comparing(list -> (Integer) list.get(1)))
.collect(Collectors.toList());
Assertions.assertIterableEquals(generateTestDataset(), sortedResult);
}
@Test
public void testJdbcMysqlSourceAndSinkParallelUpperLower() throws Exception {
Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf");
Assertions.assertEquals(0, execResult.getExitCode());
//Sorting is required, because it is read in parallel, so there will be out of order
List<List> sortedResult = queryResult().stream().sorted(Comparator.comparing(list -> (Integer) list.get(1)))
.collect(Collectors.toList());
//lower=1 upper=50
List<List> limit50 = generateTestDataset().stream().limit(50).collect(Collectors.toList());
Assertions.assertIterableEquals(limit50, sortedResult);
}
@Test
public void testJdbcMysqlSourceAndSinkXA() throws Exception {
Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink_xa.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertIterableEquals(generateTestDataset(), queryResult());
}
@Test
public void testJdbcMysqlSourceAndSinkDataType() throws Exception {
Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_mysql_source_and_sink_datatype.conf");
Assertions.assertEquals(0, execResult.getExitCode());
checkSinkDataTypeTable();
}
private void checkSinkDataTypeTable() throws Exception {
try (Connection connection = DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(), mc.getPassword())) {
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(config.getString("check_type_sink_table_sql"));
resultSet.next();
Assertions.assertEquals(resultSet.getInt(1), 2);
}
}
private List<List> queryResult() {
List<List> result = new ArrayList<>();
try (Connection connection = DriverManager.getConnection(mc.getJdbcUrl(), mc.getUsername(), mc.getPassword())) {
Statement statement = connection.createStatement();
String sql = "select name , age from sink ";
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
result.add(
Arrays.asList(
resultSet.getString(1),
resultSet.getInt(2)
)
);
}
} catch (SQLException e) {
throw new RuntimeException("Query result data failed!", e);
}
return result;
}
private static List<List> generateTestDataset() {
List<List> rows = new ArrayList<>();
for (int i = 1; i <= 1000; i++) {
rows.add(
Arrays.asList(
String.format("user_%s", i),
i
));
}
return rows;
}
@AfterEach
public void closePostgreSqlContainer() {
if (mc != null) {
mc.stop();
}
}
}