blob: c8d2b1260dc770f2f9f959e1e810f16486b22a6f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.databases.postgres.catalog;
import org.apache.flink.connector.jdbc.databases.postgres.PostgresTestBase;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
import org.apache.flink.connector.jdbc.testutils.databases.postgres.PostgresDatabase;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.types.logical.DecimalType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
/** Test base for {@link PostgresCatalog}. */
class PostgresCatalogTestBase implements JdbcITCaseBase, PostgresTestBase {
private static DatabaseMetadata getStaticMetadata() {
return PostgresDatabase.getMetadata();
}
protected static final String TEST_CATALOG_NAME = "mypg";
protected static final String TEST_USERNAME = getStaticMetadata().getUsername();
protected static final String TEST_PWD = getStaticMetadata().getPassword();
protected static final String TEST_DB = "test";
protected static final String TEST_SCHEMA = "test_schema";
protected static final String TABLE1 = "t1";
protected static final String TABLE2 = "t2";
protected static final String TABLE3 = "t3";
protected static final String TABLE4 = "t4";
protected static final String TABLE5 = "t5";
protected static final String TABLE_PRIMITIVE_TYPE = "primitive_table";
protected static final String TABLE_PRIMITIVE_TYPE2 = "primitive_table2";
protected static final String TABLE_ARRAY_TYPE = "array_table";
protected static final String TABLE_SERIAL_TYPE = "serial_table";
protected static String baseUrl;
protected static PostgresCatalog catalog;
@BeforeAll
static void init() throws SQLException {
// jdbc:postgresql://localhost:50807/postgres?user=postgres
String jdbcUrl = getStaticMetadata().getJdbcUrl();
// jdbc:postgresql://localhost:50807/
baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
catalog =
new PostgresCatalog(
Thread.currentThread().getContextClassLoader(),
TEST_CATALOG_NAME,
PostgresCatalog.DEFAULT_DATABASE,
TEST_USERNAME,
TEST_PWD,
baseUrl);
// create test database and schema
createSchema(TEST_DB, TEST_SCHEMA);
// create test tables
// table: postgres.public.t1
// table: postgres.public.t4
// table: postgres.public.t5
createTable(PostgresTablePath.fromFlinkTableName(TABLE1), getSimpleTable().pgSchemaSql);
createTable(PostgresTablePath.fromFlinkTableName(TABLE4), getSimpleTable().pgSchemaSql);
createTable(PostgresTablePath.fromFlinkTableName(TABLE5), getSimpleTable().pgSchemaSql);
// table: test.public.t2
// table: test.test_schema.t3
// table: postgres.public.dt
// table: postgres.public.dt2
createTable(
TEST_DB,
PostgresTablePath.fromFlinkTableName(TABLE2),
getSimpleTable().pgSchemaSql);
createTable(
TEST_DB, new PostgresTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().pgSchemaSql);
createTable(
PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE),
getPrimitiveTable().pgSchemaSql);
createTable(
PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2),
getPrimitiveTable("test_pk2").pgSchemaSql);
createTable(
PostgresTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE),
getArrayTable().pgSchemaSql);
createTable(
PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE),
getSerialTable().pgSchemaSql);
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"insert into public.%s values (%s);", TABLE1, getSimpleTable().values));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"insert into %s values (%s);",
TABLE_PRIMITIVE_TYPE, getPrimitiveTable().values));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"insert into %s values (%s);", TABLE_ARRAY_TYPE, getArrayTable().values));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"insert into %s values (%s);", TABLE_SERIAL_TYPE, getSerialTable().values));
}
@AfterAll
static void afterAll() throws SQLException {
executeSQL(TEST_DB, String.format("DROP SCHEMA %s CASCADE", TEST_SCHEMA));
executeSQL(
TEST_DB,
String.format("DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE2)));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format("DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE1)));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format("DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE4)));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format("DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE5)));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"DROP TABLE %s ",
PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE)));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"DROP TABLE %s ",
PostgresTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE2)));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE)));
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format(
"DROP TABLE %s ", PostgresTablePath.fromFlinkTableName(TABLE_SERIAL_TYPE)));
}
public static void createTable(PostgresTablePath tablePath, String tableSchemaSql)
throws SQLException {
executeSQL(
PostgresCatalog.DEFAULT_DATABASE,
String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
}
public static void createTable(String db, PostgresTablePath tablePath, String tableSchemaSql)
throws SQLException {
executeSQL(
db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
}
public static void createSchema(String db, String schema) throws SQLException {
executeSQL(db, String.format("CREATE SCHEMA %s", schema));
}
public static void createDatabase(String database) throws SQLException {
executeSQL(String.format("CREATE DATABASE %s;", database));
}
public static void executeSQL(String sql) throws SQLException {
executeSQL("", sql);
}
public static void executeSQL(String db, String sql) throws SQLException {
try (Connection conn =
DriverManager.getConnection(
String.format("%s/%s", baseUrl, db), TEST_USERNAME, TEST_PWD);
Statement statement = conn.createStatement()) {
statement.executeUpdate(sql);
} catch (SQLException e) {
throw e;
}
}
/** Object holding schema and corresponding sql. */
public static class TestTable {
Schema schema;
String pgSchemaSql;
String values;
public TestTable(Schema schema, String pgSchemaSql, String values) {
this.schema = schema;
this.pgSchemaSql = pgSchemaSql;
this.values = values;
}
}
public static TestTable getSimpleTable() {
return new TestTable(
Schema.newBuilder().column("id", DataTypes.INT()).build(), "id integer", "1");
}
// posgres doesn't support to use the same primary key name across different tables,
// make the table parameterized to resolve this problem.
public static TestTable getPrimitiveTable() {
return getPrimitiveTable("test_pk");
}
// TODO: add back timestamptz and time types.
// Flink currently doesn't support converting time's precision, with the following error
// TableException: Unsupported conversion from data type 'TIME(6)' (conversion class:
// java.sql.Time)
// to type information. Only data types that originated from type information fully support a
// reverse conversion.
public static TestTable getPrimitiveTable(String primaryKeyName) {
return new TestTable(
Schema.newBuilder()
.column("int", DataTypes.INT().notNull())
.column("bytea", DataTypes.BYTES())
.column("short", DataTypes.SMALLINT().notNull())
.column("long", DataTypes.BIGINT())
.column("real", DataTypes.FLOAT())
.column("double_precision", DataTypes.DOUBLE())
.column("numeric", DataTypes.DECIMAL(10, 5))
.column("decimal", DataTypes.DECIMAL(10, 1))
.column("boolean", DataTypes.BOOLEAN())
.column("text", DataTypes.STRING())
.column("char", DataTypes.CHAR(1))
.column("character", DataTypes.CHAR(3))
.column("character_varying", DataTypes.VARCHAR(20))
.column("timestamp", DataTypes.TIMESTAMP(5))
// .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))
.column("date", DataTypes.DATE())
.column("time", DataTypes.TIME(0))
.column("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))
.primaryKeyNamed(primaryKeyName, "short", "int")
.build(),
"int integer, "
+ "bytea bytea, "
+ "short smallint, "
+ "long bigint, "
+ "real real, "
+ "double_precision double precision, "
+ "numeric numeric(10, 5), "
+ "decimal decimal(10, 1), "
+ "boolean boolean, "
+ "text text, "
+ "char char, "
+ "character character(3), "
+ "character_varying character varying(20), "
+ "timestamp timestamp(5), "
+
// "timestamptz timestamptz(4), " +
"date date,"
+ "time time(0), "
+ "default_numeric numeric, "
+ "CONSTRAINT "
+ primaryKeyName
+ " PRIMARY KEY (short, int)",
"1,"
+ "'2',"
+ "3,"
+ "4,"
+ "5.5,"
+ "6.6,"
+ "7.7,"
+ "8.8,"
+ "true,"
+ "'a',"
+ "'b',"
+ "'c',"
+ "'d',"
+ "'2016-06-22 19:10:25',"
+
// "'2006-06-22 19:10:25'," +
"'2015-01-01',"
+ "'00:51:02.746572', "
+ "500");
}
// TODO: add back timestamptz once planner supports timestamp with timezone
public static TestTable getArrayTable() {
return new TestTable(
Schema.newBuilder()
.column("int_arr", DataTypes.ARRAY(DataTypes.INT()))
.column("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES()))
.column("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT()))
.column("long_arr", DataTypes.ARRAY(DataTypes.BIGINT()))
.column("real_arr", DataTypes.ARRAY(DataTypes.FLOAT()))
.column("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
.column("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5)))
.column(
"numeric_arr_default",
DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)))
.column("decimal_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)))
.column("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN()))
.column("text_arr", DataTypes.ARRAY(DataTypes.STRING()))
.column("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1)))
.column("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3)))
.column("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20)))
.column("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(5)))
// .field("timestamptz_arr",
// DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)))
.column("date_arr", DataTypes.ARRAY(DataTypes.DATE()))
.column("time_arr", DataTypes.ARRAY(DataTypes.TIME(0)))
.column("null_bytea_arr", DataTypes.ARRAY(DataTypes.BYTES()))
.column("null_text_arr", DataTypes.ARRAY(DataTypes.STRING()))
.build(),
"int_arr integer[], "
+ "bytea_arr bytea[], "
+ "short_arr smallint[], "
+ "long_arr bigint[], "
+ "real_arr real[], "
+ "double_precision_arr double precision[], "
+ "numeric_arr numeric(10, 5)[], "
+ "numeric_arr_default numeric[], "
+ "decimal_arr decimal(10,2)[], "
+ "boolean_arr boolean[], "
+ "text_arr text[], "
+ "char_arr char[], "
+ "character_arr character(3)[], "
+ "character_varying_arr character varying(20)[], "
+ "timestamp_arr timestamp(5)[], "
+
// "timestamptz_arr timestamptz(4)[], " +
"date_arr date[], "
+ "time_arr time(0)[], "
+ "null_bytea_arr bytea[], "
+ "null_text_arr text[]",
String.format(
"'{1,2,3}',"
+ "'{2,3,4}',"
+ "'{3,4,5}',"
+ "'{4,5,6}',"
+ "'{5.5,6.6,7.7}',"
+ "'{6.6,7.7,8.8}',"
+ "'{7.7,8.8,9.9}',"
+ "'{8.8,9.9,10.10}',"
+ "'{9.9,10.10,11.11}',"
+ "'{true,false,true}',"
+ "'{a,b,c}',"
+ "'{b,c,d}',"
+ "'{b,c,d}',"
+ "'{b,c,d}',"
+ "'{\"2016-06-22 19:10:25\", \"2019-06-22 19:10:25\"}',"
+
// "'{\"2006-06-22 19:10:25\", \"2009-06-22 19:10:25\"}'," +
"'{\"2015-01-01\", \"2020-01-01\"}',"
+ "'{\"00:51:02.746572\", \"00:59:02.746572\"}',"
+ "NULL,"
+ "NULL"));
}
public static TestTable getSerialTable() {
return new TestTable(
Schema.newBuilder()
// serial fields are returned as not null by ResultSetMetaData.columnNoNulls
.column("f0", DataTypes.SMALLINT().notNull())
.column("f1", DataTypes.INT().notNull())
.column("f2", DataTypes.SMALLINT().notNull())
.column("f3", DataTypes.INT().notNull())
.column("f4", DataTypes.BIGINT().notNull())
.column("f5", DataTypes.BIGINT().notNull())
.build(),
"f0 smallserial, "
+ "f1 serial, "
+ "f2 serial2, "
+ "f3 serial4, "
+ "f4 serial8, "
+ "f5 bigserial",
"32767,"
+ "2147483647,"
+ "32767,"
+ "2147483647,"
+ "9223372036854775807,"
+ "9223372036854775807");
}
}