| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.connectors.hive; |
| |
| import org.apache.flink.table.HiveVersionTestUtil; |
| import org.apache.flink.table.api.EnvironmentSettings; |
| import org.apache.flink.table.api.SqlDialect; |
| import org.apache.flink.table.api.TableEnvironment; |
| import org.apache.flink.table.api.TableException; |
| import org.apache.flink.table.api.ValidationException; |
| import org.apache.flink.table.catalog.CatalogTest; |
| import org.apache.flink.table.catalog.hive.HiveCatalog; |
| import org.apache.flink.table.catalog.hive.HiveTestUtils; |
| import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory; |
| import org.apache.flink.table.planner.utils.TableTestBase; |
| import org.apache.flink.types.Row; |
| |
| import org.junit.jupiter.api.AfterAll; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.Test; |
| |
| import java.sql.Timestamp; |
| import java.util.Arrays; |
| |
| import static org.assertj.core.api.Assertions.assertThatThrownBy; |
| |
| /** |
| * Test Temporal join of hive tables. |
| * |
| * <p>Defining primary key only supports since hive 3.0.0, skip other versions only test in hive |
| * 3.1.1. To run this test, please use mvn command: mvn test -Phive-3.1.1 |
| * -Dtest=org.apache.flink.connectors.hive.HiveTemporalJoinITCase |
| */ |
| class HiveTemporalJoinITCase extends TableTestBase { |
| |
| private static TableEnvironment tableEnv; |
| private static HiveCatalog hiveCatalog; |
| |
| @BeforeAll |
| static void setup() { |
| if (!HiveVersionTestUtil.HIVE_310_OR_LATER) { |
| return; |
| } |
| tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); |
| hiveCatalog = HiveTestUtils.createHiveCatalog(); |
| |
| hiveCatalog = HiveTestUtils.createHiveCatalog(CatalogTest.TEST_CATALOG_NAME, "3.1.3"); |
| |
| tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); |
| tableEnv.useCatalog(hiveCatalog.getName()); |
| // create probe table |
| TestCollectionTableFactory.initData( |
| Arrays.asList( |
| Row.of(1, "a", Timestamp.valueOf("1970-01-01 00:00:00.001")), |
| Row.of(1, "c", Timestamp.valueOf("1970-01-01 00:00:00.002")), |
| Row.of(2, "b", Timestamp.valueOf("1970-01-01 00:00:00.003")), |
| Row.of(2, "c", Timestamp.valueOf("1970-01-01 00:00:00.004")), |
| Row.of(3, "c", Timestamp.valueOf("1970-01-01 00:00:00.005")), |
| Row.of(4, "d", Timestamp.valueOf("1970-01-01 00:00:00.006")))); |
| tableEnv.executeSql( |
| "create table default_catalog.default_database.probe (" |
| + " x int," |
| + " y string," |
| + " rowtime timestamp(3)," |
| + " p as proctime()," |
| + " watermark for rowtime as rowtime) " |
| + "with ('connector'='COLLECTION','is-bounded' = 'false')"); |
| |
| tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); |
| tableEnv.executeSql( |
| String.format( |
| "create table build (" |
| + " x int, " |
| + " y string, " |
| + " z int, " |
| + " primary key(x,y) disable novalidate rely)" |
| + " tblproperties ('%s' = 'true', '%s'='5min')", |
| HiveOptions.STREAMING_SOURCE_ENABLE.key(), |
| HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL.key())); |
| |
| tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); |
| } |
| |
| @Test |
| void testProcTimeTemporalJoinHiveTable() throws Exception { |
| if (!HiveVersionTestUtil.HIVE_310_OR_LATER) { |
| return; |
| } |
| tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); |
| tableEnv.executeSql("insert into build values (1,'a',10),(2,'a',21),(2,'b',22),(3,'c',33)") |
| .await(); |
| |
| assertThatThrownBy( |
| () -> |
| tableEnv.executeSql( |
| "select p.x, p.y, b.z from " |
| + " default_catalog.default_database.probe as p " |
| + " join build for system_time as of p.p as b on p.x=b.x and p.y=b.y")) |
| .hasMessageContaining("Processing-time temporal join is not supported yet.") |
| .isInstanceOf(TableException.class); |
| } |
| |
| @Test |
| void testRowTimeTemporalJoinHiveTable() throws Exception { |
| if (!HiveVersionTestUtil.HIVE_310_OR_LATER) { |
| return; |
| } |
| tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); |
| tableEnv.executeSql("insert into build values (1,'a',10),(2,'a',21),(2,'b',22),(3,'c',33)") |
| .await(); |
| |
| // Streaming hive table does not support defines watermark |
| assertThatThrownBy( |
| () -> |
| tableEnv.executeSql( |
| "select p.x, p.y, b.z from " |
| + " default_catalog.default_database.probe as p " |
| + " join build for system_time as of p.rowtime as b on p.x=b.x and p.y=b.y")) |
| .hasMessageContaining( |
| "Event-Time Temporal Table Join requires both primary key" |
| + " and row time attribute in versioned table, but no row time attribute can be found.") |
| .isInstanceOf(ValidationException.class); |
| } |
| |
| @AfterAll |
| static void tearDown() { |
| if (!HiveVersionTestUtil.HIVE_310_OR_LATER) { |
| return; |
| } |
| tableEnv.executeSql("drop table build"); |
| |
| if (hiveCatalog != null) { |
| hiveCatalog.close(); |
| } |
| } |
| } |