blob: ebe7847c1f17f1a6c2983c434239257da3027a98 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.e2e.spark.v2.jdbc;
import static org.awaitility.Awaitility.given;
import org.apache.seatunnel.e2e.spark.SparkContainer;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@Slf4j
public class JdbcDmdbIT extends SparkContainer {
private static final String DM_DOCKER_IMAGE = "laglangyue/dmdb8";
private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
private static final String HOST = "spark_e2e_dmdb";
private static final String LOCAL_HOST = "localhost";
private static final String URL = "jdbc:dm://%s:5236";
private static final String USERNAME = "SYSDBA";
private static final String PASSWORD = "SYSDBA";
private static final String DATABASE = "SYSDBA";
private static final String SOURCE_TABLE = "e2e_table_source";
private static final String SINK_TABLE = "e2e_table_sink";
private GenericContainer<?> dbServer;
private Connection jdbcConnection;
@BeforeEach
public void beforeAllForDM() {
try {
dbServer = new GenericContainer<>(DM_DOCKER_IMAGE)
.withNetwork(NETWORK)
.withNetworkAliases(HOST)
.withLogConsumer(new Slf4jLogConsumer(log));
dbServer.setPortBindings(Lists.newArrayList("5236:5236"));
Startables.deepStart(Stream.of(dbServer)).join();
log.info("dmdb container started");
Class.forName(DRIVER_CLASS);
given().ignoreExceptions()
.await()
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initializeJdbcConnection);
initializeJdbcTable();
} catch (Exception ex) {
log.error("dm container init failed", ex);
throw new RuntimeException(ex);
}
}
@AfterEach
public void closeDmdbContainer() throws SQLException {
if (jdbcConnection != null) {
jdbcConnection.close();
}
if (dbServer != null) {
dbServer.close();
}
}
private void initializeJdbcConnection() throws SQLException {
jdbcConnection = DriverManager.getConnection(String.format(
URL, dbServer.getHost()), USERNAME, PASSWORD);
}
/**
* init the table for DM_SERVER, DDL and DML for source and sink
*/
private void initializeJdbcTable() {
URL resource = JdbcDmdbIT.class.getResource("/jdbc/init_sql/dm_init.conf");
if (resource == null) {
throw new IllegalArgumentException("can't find find file");
}
String file = resource.getFile();
Config config = ConfigFactory.parseFile(new File(file));
assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink");
try (Statement statement = jdbcConnection.createStatement()) {
// source
String sourceTableDDL = config.getString("dm_table_source");
statement.execute(sourceTableDDL);
String insertSQL = config.getString("DML");
statement.execute(insertSQL);
// sink
String sinkTableDDL = config.getString("dm_table_sink");
statement.execute(sinkTableDDL);
} catch (SQLException e) {
throw new RuntimeException("Initializing table failed!", e);
}
}
private void assertHasData(String table) {
try (Statement statement = jdbcConnection.createStatement();) {
String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
ResultSet source = statement.executeQuery(sql);
Assertions.assertTrue(source.next());
} catch (SQLException e) {
throw new RuntimeException("test dm server image error", e);
}
}
@Test
@DisplayName("JDBC-DM container can be pull")
public void testDMDBImage() {
assertHasData(SOURCE_TABLE);
}
@Test
@DisplayName("spark JDBC-DM test for all type mapper")
public void testDMDBSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_dm_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
assertHasData(SINK_TABLE);
}
}