blob: c66e6650befbb3721ad82b567a773f1608acffe7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.hive;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTest;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Test Temporal join of hive tables.
*
* <p>Defining primary key only supports since hive 3.0.0, skip other versions only test in hive
* 3.1.1. To run this test, please use mvn command: mvn test -Phive-3.1.1
* -Dtest=org.apache.flink.connectors.hive.HiveTemporalJoinITCase
*/
class HiveTemporalJoinITCase extends TableTestBase {
private static TableEnvironment tableEnv;
private static HiveCatalog hiveCatalog;
@BeforeAll
static void setup() {
if (!HiveVersionTestUtil.HIVE_310_OR_LATER) {
return;
}
tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
hiveCatalog = HiveTestUtils.createHiveCatalog();
hiveCatalog = HiveTestUtils.createHiveCatalog(CatalogTest.TEST_CATALOG_NAME, "3.1.3");
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
// create probe table
TestCollectionTableFactory.initData(
Arrays.asList(
Row.of(1, "a", Timestamp.valueOf("1970-01-01 00:00:00.001")),
Row.of(1, "c", Timestamp.valueOf("1970-01-01 00:00:00.002")),
Row.of(2, "b", Timestamp.valueOf("1970-01-01 00:00:00.003")),
Row.of(2, "c", Timestamp.valueOf("1970-01-01 00:00:00.004")),
Row.of(3, "c", Timestamp.valueOf("1970-01-01 00:00:00.005")),
Row.of(4, "d", Timestamp.valueOf("1970-01-01 00:00:00.006"))));
tableEnv.executeSql(
"create table default_catalog.default_database.probe ("
+ " x int,"
+ " y string,"
+ " rowtime timestamp(3),"
+ " p as proctime(),"
+ " watermark for rowtime as rowtime) "
+ "with ('connector'='COLLECTION','is-bounded' = 'false')");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql(
String.format(
"create table build ("
+ " x int, "
+ " y string, "
+ " z int, "
+ " primary key(x,y) disable novalidate rely)"
+ " tblproperties ('%s' = 'true', '%s'='5min')",
HiveOptions.STREAMING_SOURCE_ENABLE.key(),
HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key()));
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
}
@Test
void testProcTimeTemporalJoinHiveTable() throws Exception {
if (!HiveVersionTestUtil.HIVE_310_OR_LATER) {
return;
}
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("insert into build values (1,'a',10),(2,'a',21),(2,'b',22),(3,'c',33)")
.await();
assertThatThrownBy(
() ->
tableEnv.executeSql(
"select p.x, p.y, b.z from "
+ " default_catalog.default_database.probe as p "
+ " join build for system_time as of p.p as b on p.x=b.x and p.y=b.y"))
.hasMessageContaining("Processing-time temporal join is not supported yet.")
.isInstanceOf(TableException.class);
}
@Test
void testRowTimeTemporalJoinHiveTable() throws Exception {
if (!HiveVersionTestUtil.HIVE_310_OR_LATER) {
return;
}
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("insert into build values (1,'a',10),(2,'a',21),(2,'b',22),(3,'c',33)")
.await();
// Streaming hive table does not support defines watermark
assertThatThrownBy(
() ->
tableEnv.executeSql(
"select p.x, p.y, b.z from "
+ " default_catalog.default_database.probe as p "
+ " join build for system_time as of p.rowtime as b on p.x=b.x and p.y=b.y"))
.hasMessageContaining(
"Event-Time Temporal Table Join requires both primary key"
+ " and row time attribute in versioned table, but no row time attribute can be found.")
.isInstanceOf(ValidationException.class);
}
@AfterAll
static void tearDown() {
if (!HiveVersionTestUtil.HIVE_310_OR_LATER) {
return;
}
tableEnv.executeSql("drop table build");
if (hiveCatalog != null) {
hiveCatalog.close();
}
}
}