| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| from tests.common.skip import SkipIf |
| |
| |
| class TestIcebergTable(ImpalaTestSuite): |
| """Tests related to Iceberg tables.""" |
| |
| @classmethod |
| def get_workload(cls): |
| return 'functional-query' |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestIcebergTable, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: v.get_value('table_format').file_format == 'parquet') |
| |
| def test_iceberg_negative(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-negative', vector, use_db=unique_database) |
| |
| def test_create_iceberg_tables(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-create', vector, use_db=unique_database) |
| |
| def test_alter_iceberg_tables(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-alter', vector, use_db=unique_database) |
| |
| @SkipIf.not_hdfs |
| def test_drop_incomplete_table(self, vector, unique_database): |
| """Test DROP TABLE when the underlying directory is deleted. In that case table |
| loading fails, but we should be still able to drop the table from Impala.""" |
| tbl_name = unique_database + ".synchronized_iceberg_tbl" |
| cat_location = "/test-warehouse/" + unique_database |
| self.client.execute("""create table {0} (i int) stored as iceberg |
| tblproperties('iceberg.catalog'='hadoop.catalog', |
| 'iceberg.catalog_location'='{1}')""".format(tbl_name, cat_location)) |
| self.hdfs_client.delete_file_dir(cat_location, True) |
| self.execute_query_expect_success(self.client, """drop table {0}""".format(tbl_name)) |
| |
| @SkipIf.not_hdfs |
| def test_insert_into_iceberg_table(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-insert', vector, use_db=unique_database) |
| |
| def test_describe_history(self, vector, unique_database): |
| self.run_test_case('QueryTest/iceberg-table-history', vector, use_db=unique_database) |
| |
| # Create a table with multiple snapshots and verify the table history. |
| tbl_name = unique_database + ".iceberg_multi_snapshots" |
| self.client.execute("""create table {0} (i int) stored as iceberg |
| tblproperties('iceberg.catalog'='hadoop.tables')""".format(tbl_name)) |
| result = self.client.execute("INSERT INTO {0} VALUES (1)".format(tbl_name)) |
| result = self.client.execute("INSERT INTO {0} VALUES (2)".format(tbl_name)) |
| result = self.client.execute("DESCRIBE HISTORY {0}".format(tbl_name)) |
| assert(len(result.data) == 2) |
| first_snapshot = result.data[0].split("\t") |
| second_snapshot = result.data[1].split("\t") |
| # Check that first snapshot is older than the second snapshot. |
| assert(first_snapshot[0] < second_snapshot[0]) |
| # Check that second snapshot's parent ID is the snapshot ID of the first snapshot. |
| assert(first_snapshot[1] == second_snapshot[2]) |
| # The first snapshot has no parent snapshot ID. |
| assert(first_snapshot[2] == "NULL") |
| # Check "is_current_ancestor" column. |
| assert(first_snapshot[3] == "TRUE" and second_snapshot[3] == "TRUE") |