blob: c45e0693e429ea750d49f2581194ef5a03b13c34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.e2e.flink.v2.jdbc;
import static org.awaitility.Awaitility.given;
import org.apache.seatunnel.e2e.flink.FlinkContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class JdbcPostgresIT extends FlinkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcPostgresIT.class);
private PostgreSQLContainer<?> pg;
@SuppressWarnings("checkstyle:MagicNumber")
@BeforeEach
public void startPostgreSqlContainer() throws Exception {
pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14.3"))
.withNetwork(NETWORK)
.withNetworkAliases("postgresql")
.withCommand("postgres -c max_prepared_transactions=100")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(pg)).join();
LOGGER.info("Postgres container started");
Class.forName(pg.getDriverClassName());
given().ignoreExceptions()
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
batchInsertData();
}
private void initializeJdbcTable() {
try (Connection connection = DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(), pg.getPassword())) {
Statement statement = connection.createStatement();
String source = "create table source(\n" +
"user_id bigserial NOT NULL PRIMARY KEY,\n" +
"name char(10),\n" +
"age INT\n" +
")";
String sink = "create table sink(\n" +
"user_id bigserial NOT NULL PRIMARY KEY,\n" +
"name char(10),\n" +
"age INT\n" +
")";
statement.execute(source);
statement.execute(sink);
} catch (SQLException e) {
throw new RuntimeException("Initializing Mysql table failed!", e);
}
}
@SuppressWarnings("checkstyle:MagicNumber")
private void batchInsertData() {
String sql = "insert into source(name, age) values(?,?)";
try (Connection connection = DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(), pg.getPassword())) {
connection.setAutoCommit(false);
PreparedStatement preparedStatement = connection.prepareStatement(sql);
for (List row : generateTestDataset()) {
preparedStatement.setString(1, (String) row.get(0));
preparedStatement.setInt(2, (Integer) row.get(1));
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
connection.commit();
} catch (SQLException e) {
throw new RuntimeException("Batch insert data failed!", e);
}
}
@Test
public void testJdbcPostgresSourceAndSink() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertIterableEquals(generateTestDataset(), queryResult());
}
@Test
public void testJdbcPostgresSourceAndSinkParallel() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_parallel.conf");
Assertions.assertEquals(0, execResult.getExitCode());
//Sorting is required, because it is read in parallel, so there will be out of order
List<List> sortedResult = queryResult().stream().sorted(Comparator.comparing(list -> (Integer) list.get(1)))
.collect(Collectors.toList());
Assertions.assertIterableEquals(generateTestDataset(), sortedResult);
}
@Test
public void testJdbcPostgresSourceAndSinkParallelUpperLower() throws Exception {
Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf");
Assertions.assertEquals(0, execResult.getExitCode());
//Sorting is required, because it is read in parallel, so there will be out of order
List<List> sortedResult = queryResult().stream().sorted(Comparator.comparing(list -> (Integer) list.get(1)))
.collect(Collectors.toList());
//lower=1 upper=50
List<List> limit50 = generateTestDataset().stream().limit(50).collect(Collectors.toList());
Assertions.assertIterableEquals(limit50, sortedResult);
}
@Test
public void testJdbcPostgresSourceAndSinkXA() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_xa.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertIterableEquals(generateTestDataset(), queryResult());
}
private List<List> queryResult() {
List<List> result = new ArrayList<>();
try (Connection connection = DriverManager.getConnection(pg.getJdbcUrl(), pg.getUsername(), pg.getPassword())) {
Statement statement = connection.createStatement();
String sql = "select name , age from sink ";
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
result.add(
Arrays.asList(
resultSet.getString(1).replace(" ", ""),
resultSet.getInt(2)
)
);
}
} catch (SQLException e) {
throw new RuntimeException("Query result data failed!", e);
}
return result;
}
private static List<List> generateTestDataset() {
List<List> rows = new ArrayList<>();
for (int i = 1; i <= 1000; i++) {
rows.add(
Arrays.asList(
String.format("user_%s", i),
i
));
}
return rows;
}
@AfterEach
public void closePostgreSqlContainer() {
if (pg != null) {
pg.stop();
}
}
}