blob: 29c4c884f63ddf30c1ad6886bf35e66f656f9078 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.databases.postgres.catalog;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog.DEFAULT_DATABASE;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
import static org.assertj.core.api.Assertions.assertThat;
/** E2E test for {@link PostgresCatalog}. */
class PostgresCatalogITCase extends PostgresCatalogTestBase {
private TableEnvironment tEnv;
@BeforeEach
void setup() {
this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
// use PG catalog
tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
tEnv.useCatalog(TEST_CATALOG_NAME);
}
@Test
void testSelectField() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select id from %s", TABLE1))
.execute()
.collect());
assertThat(results).hasToString("[+I[1]]");
}
@Test
void testWithoutSchema() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE1))
.execute()
.collect());
assertThat(results).hasToString("[+I[1]]");
}
@Test
void testWithSchema() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(
String.format(
"select * from `%s`",
PostgresTablePath.fromFlinkTableName(TABLE1)))
.execute()
.collect());
assertThat(results).hasToString("[+I[1]]");
}
@Test
void testFullPath() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(
String.format(
"select * from %s.%s.`%s`",
TEST_CATALOG_NAME,
DEFAULT_DATABASE,
PostgresTablePath.fromFlinkTableName(TABLE1)))
.execute()
.collect());
assertThat(results).hasToString("[+I[1]]");
}
@Test
void testInsert() throws Exception {
tEnv.executeSql(String.format("insert into %s select * from `%s`", TABLE4, TABLE1)).await();
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE4))
.execute()
.collect());
assertThat(results).hasToString("[+I[1]]");
}
@Test
void testGroupByInsert() throws Exception {
tEnv.executeSql(
String.format(
"insert into `%s` "
+ "select `int`, cast('41' as bytes), `short`, max(`long`), max(`real`), "
+ "max(`double_precision`), max(`numeric`), max(`decimal`), max(`boolean`), "
+ "max(`text`), 'B', 'C', max(`character_varying`), max(`timestamp`), "
+ "max(`date`), max(`time`), max(`default_numeric`) "
+ "from `%s` group by `int`, `short`",
TABLE_PRIMITIVE_TYPE2, TABLE_PRIMITIVE_TYPE))
.await();
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from `%s`", TABLE_PRIMITIVE_TYPE2))
.execute()
.collect());
assertThat(results)
.hasToString(
"[+I[1, [52, 49], 3, 4, 5.5, 6.6, 7.70000, 8.8, true, a, B, C , d, 2016-06-22T19:10:25, 2015-01-01, 00:51:03, 500.000000000000000000]]");
}
@Test
void testPrimitiveTypes() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE_PRIMITIVE_TYPE))
.execute()
.collect());
assertThat(results)
.hasToString(
"[+I[1, [50], 3, 4, 5.5, 6.6, 7.70000, 8.8, true, a, b, c , d, 2016-06-22T19:10:25, 2015-01-01, 00:51:03, 500.000000000000000000]]");
}
@Test
void testArrayTypes() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE_ARRAY_TYPE))
.execute()
.collect());
assertThat(results)
.hasToString(
"[+I["
+ "[1, 2, 3], "
+ "[[50], [51], [52]], "
+ "[3, 4, 5], "
+ "[4, 5, 6], "
+ "[5.5, 6.6, 7.7], "
+ "[6.6, 7.7, 8.8], "
+ "[7.70000, 8.80000, 9.90000], "
+ "[8.800000000000000000, 9.900000000000000000, 10.100000000000000000], "
+ "[9.90, 10.10, 11.11], "
+ "[true, false, true], "
+ "[a, b, c], "
+ "[b, c, d], "
+ "[b , c , d ], "
+ "[b, c, d], "
+ "[2016-06-22T19:10:25, 2019-06-22T19:10:25], "
+ "[2015-01-01, 2020-01-01], "
+ "[00:51:03, 00:59:03], "
+ "null, "
+ "null]]");
}
@Test
void testSerialTypes() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE_SERIAL_TYPE))
.execute()
.collect());
assertThat(results)
.hasToString(
"[+I["
+ "32767, "
+ "2147483647, "
+ "32767, "
+ "2147483647, "
+ "9223372036854775807, "
+ "9223372036854775807]]");
}
}