blob: 11bf61d0fc967ec7eb4bbe3f9817baa1a0cf651d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.apache.flink.connector.jdbc.databases.cratedb.catalog.CrateDBCatalog.DEFAULT_DATABASE;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
import static org.assertj.core.api.Assertions.assertThat;
/** E2E test for {@link CrateDBCatalog}. */
class CrateDBCatalogITCase extends CrateDBCatalogTestBase {
private TableEnvironment tEnv;
@BeforeEach
void setup() {
this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
// use CrateDB catalog
tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
tEnv.useCatalog(TEST_CATALOG_NAME);
}
@Test
void testSelectField() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select id from %s", TABLE1))
.execute()
.collect());
assertThat(results).hasToString("[+I[1]]");
}
@Test
void testWithoutSchema() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE1))
.execute()
.collect());
assertThat(results).hasToString("[+I[1]]");
}
@Test
void testWithSchema() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(
String.format(
"select * from `%s`",
CrateDBTablePath.fromFlinkTableName(TABLE1)))
.execute()
.collect());
assertThat(results).hasToString("[+I[1]]");
}
@Test
void testFullPath() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(
String.format(
"select * from %s.%s.`%s`",
TEST_CATALOG_NAME,
DEFAULT_DATABASE,
CrateDBTablePath.fromFlinkTableName(TABLE1)))
.execute()
.collect());
assertThat(results).hasToString("[+I[1]]");
}
@Test
void testGroupByInsert() throws Exception {
tEnv.executeSql(
String.format(
"insert into `%s`"
+ "select `int`, `short`, max(`long`), max(`real`), "
+ "max(`double`), max(`boolean`), "
+ "max(`text`), max(`timestamp`) "
+ "from `%s` group by `int`, `short`",
TABLE_TARGET_PRIMITIVE, TABLE_PRIMITIVE_TYPE))
.await();
executeSQL(String.format("REFRESH TABLE doc.%s", TABLE_TARGET_PRIMITIVE));
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from `%s`", TABLE_TARGET_PRIMITIVE))
.execute()
.collect());
assertThat(results)
.hasToString("[+I[1, 3, 4, 5.5, 6.6, true, b, 2016-06-22T19:10:25.123]]");
}
@Test
void testPrimitiveTypes() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE_PRIMITIVE_TYPE))
.execute()
.collect());
assertThat(results)
.hasToString(
"[+I[1, 3, 3, 4, 4, 5.5, 5.5, 6.6, 6.6, true, a, b, c, d , e, 192.168.0.100, 2016-06-22T19:10:25.123]]");
}
@Test
void testArrayTypes() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE_ARRAY_TYPE))
.execute()
.collect());
assertThat(results)
.hasToString(
"[+I[[1, 2, 3], [3, 4, 5], [3, 4, 5], [4, 5, 6], [4, 5, 6], [5.5, 6.6, 7.7], [5.5, 6.6, 7.7], [6.6, 7.7, 8.8], [6.6, 7.7, 8.8], [true, false, true], [a, b, c], [a, b, c], [b, c, d], [b , c , d ], [b, c, d], [0:0:0:0:0:ffff:c0a8:64, 10.2.5.28, 127.0.0.6], [2016-06-22T19:10:25.123, 2019-06-22T11:22:33.987], null]]");
}
}