blob: d236d578efba6207aa57e0bc728f6f5e02534542 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.table;
import org.apache.flink.connector.jdbc.databases.mysql.MySqlDatabase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import static java.lang.String.format;
import static java.lang.String.join;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Test unsigned type conversion between Flink and JDBC driver mysql, the test underlying use MySQL
* to mock a DB.
*/
class UnsignedTypeConversionITCase extends AbstractTestBase implements MySqlDatabase {
private static final Logger LOGGER =
LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
private static final String TABLE_NAME = "unsigned_test";
private static final List<String> COLUMNS =
Arrays.asList(
"tiny_c",
"tiny_un_c",
"small_c",
"small_un_c",
"int_c",
"int_un_c",
"big_c",
"big_un_c");
private static final Object[] ROW =
new Object[] {
(byte) 127,
(short) 255,
(short) 32767,
65535,
2147483647,
4294967295L,
9223372036854775807L,
new BigDecimal("18446744073709551615")
};
@Test
void testUnsignedType() throws Exception {
try (Connection con = getMetadata().getConnection()) {
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = StreamTableEnvironment.create(sEnv);
createMysqlTable(con);
createFlinkTable(tableEnv);
prepareData(tableEnv);
// write data to db
tableEnv.executeSql(
format("insert into jdbc_sink select %s from data", join(",", COLUMNS)))
.await();
// read data from db using jdbc connection and compare
try (PreparedStatement ps =
con.prepareStatement(
format("select %s from %s", join(",", COLUMNS), TABLE_NAME))) {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
for (int i = 0; i < ROW.length; i++) {
assertThat(resultSet.getObject(i + 1, ROW[i].getClass())).isEqualTo(ROW[i]);
}
}
}
// read data from db using flink and compare
String sql = format("select %s from jdbc_source", join(",", COLUMNS));
CloseableIterator<Row> collected = tableEnv.executeSql(sql).collect();
List<Row> result = CollectionUtil.iteratorToList(collected);
assertThat(result).containsOnly(Row.ofKind(RowKind.INSERT, ROW));
}
}
private void createMysqlTable(Connection con) throws SQLException {
try (PreparedStatement ps =
con.prepareStatement(
"create table "
+ TABLE_NAME
+ " ("
+ " tiny_c TINYINT,"
+ " tiny_un_c TINYINT UNSIGNED,"
+ " small_c SMALLINT,"
+ " small_un_c SMALLINT UNSIGNED,"
+ " int_c INTEGER ,"
+ " int_un_c INTEGER UNSIGNED,"
+ " big_c BIGINT,"
+ " big_un_c BIGINT UNSIGNED);")) {
ps.execute();
}
}
private void createFlinkTable(TableEnvironment tableEnv) {
String commonDDL =
"create table %s ("
+ "tiny_c TINYINT,"
+ "tiny_un_c SMALLINT,"
+ "small_c SMALLINT,"
+ "small_un_c INT,"
+ "int_c INT,"
+ "int_un_c BIGINT,"
+ "big_c BIGINT,"
+ "big_un_c DECIMAL(20, 0)) with("
+ " 'connector' = 'jdbc',"
+ " 'url' = '"
+ getMetadata().getJdbcUrlWithCredentials()
+ "',"
+ " 'table-name' = '"
+ TABLE_NAME
+ "'"
+ ")";
tableEnv.executeSql(format(commonDDL, "jdbc_source"));
tableEnv.executeSql(format(commonDDL, "jdbc_sink"));
}
private void prepareData(TableEnvironment tableEnv) {
Table dataTable =
tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("tiny_c", DataTypes.TINYINT().notNull()),
DataTypes.FIELD("tiny_un_c", DataTypes.SMALLINT().notNull()),
DataTypes.FIELD("small_c", DataTypes.SMALLINT().notNull()),
DataTypes.FIELD("small_un_c", DataTypes.INT().notNull()),
DataTypes.FIELD("int_c", DataTypes.INT().notNull()),
DataTypes.FIELD("int_un_c", DataTypes.BIGINT().notNull()),
DataTypes.FIELD("big_c", DataTypes.BIGINT().notNull()),
DataTypes.FIELD("big_un_c", DataTypes.DECIMAL(20, 0).notNull())),
Row.of(ROW));
tableEnv.createTemporaryView("data", dataTable);
}
}